from quantum.plugins.nicira import nicira_networkgw_db as networkgw_db
from quantum.plugins.nicira import nicira_qos_db as qos_db
from quantum.plugins.nicira import nvp_cluster
-from quantum.plugins.nicira.nvp_plugin_version import PLUGIN_VERSION
from quantum.plugins.nicira import NvpApiClient
from quantum.plugins.nicira import nvplib
# Map nova zones to cluster for easy retrieval
novazone_cluster_map = {}
- port_security_enabled_update = "update_port:port_security_enabled"
-
- def __init__(self, loglevel=None):
- if loglevel:
- logging.basicConfig(level=loglevel)
- nvplib.LOG.setLevel(loglevel)
- NvpApiClient.LOG.setLevel(loglevel)
+ def __init__(self):
# Routines for managing logical ports in NVP
self._port_drivers = {
self._nvp_delete_router_port,
l3_db.DEVICE_OWNER_FLOATINGIP:
self._nvp_delete_fip_port,
- l3_db.DEVICE_OWNER_ROUTER_INTF:
- self._nvp_delete_port,
networkgw_db.DEVICE_OWNER_NET_GW_INTF:
self._nvp_delete_port,
'default': self._nvp_delete_port}
"port id %(port_id)s on router %(router_id)s") %
{'port_id': port_data.get('id'), 'router_id': router_id})
self._update_router_port_attachment(cluster, context, router_id,
- port_data, attachment_type,
- attachment, attachment_vlan,
- lrouter_port['uuid'])
+ port_data, lrouter_port['uuid'],
+ attachment_type, attachment,
+ attachment_vlan)
return lrouter_port
def _update_router_port_attachment(self, cluster, context,
router_id, port_data,
- attachment_type, attachment,
- attachment_vlan=None,
- nvp_router_port_id=None):
+ nvp_router_port_id,
+ attachment_type,
+ attachment,
+ attachment_vlan=None):
if not nvp_router_port_id:
nvp_router_port_id = self._find_router_gw_port(context, port_data)
try:
err_msg=(_("It is not allowed to create router interface "
"ports on external networks as '%s'") %
port_data['network_id']))
- try:
- selected_lswitch = self._nvp_find_lswitch_for_port(context,
- port_data)
- # Do not apply port security here!
- lport = self._nvp_create_port_helper(self.cluster,
- selected_lswitch['uuid'],
- port_data,
- False)
- nicira_db.add_quantum_nvp_port_mapping(
- context.session, port_data['id'], lport['uuid'])
- LOG.debug(_("_nvp_create_port completed for port %(name)s on "
- "network %(network_id)s. The new port id is %(id)s."),
- port_data)
- except Exception:
- # failed to create port in NVP delete port from quantum_db
- LOG.exception(_("An exception occured while plugging "
- "the interface"))
- super(NvpPluginV2, self).delete_port(context, port_data["id"])
- raise
+ selected_lswitch = self._nvp_find_lswitch_for_port(context,
+ port_data)
+ # Do not apply port security here!
+ lport = self._nvp_create_port_helper(self.cluster,
+ selected_lswitch['uuid'],
+ port_data,
+ False)
+ nicira_db.add_quantum_nvp_port_mapping(
+ context.session, port_data['id'], lport['uuid'])
+ LOG.debug(_("_nvp_create_port completed for port %(name)s on "
+ "network %(network_id)s. The new port id is %(id)s."),
+ port_data)
def _find_router_gw_port(self, context, port_data):
router_id = port_data['device_id']
# Update attachment
self._update_router_port_attachment(
self.cluster, context, router_id, port_data,
+ lr_port['uuid'],
"L3GatewayAttachment",
ext_network[pnet.PHYSICAL_NETWORK],
- ext_network[pnet.SEGMENTATION_ID],
- lr_port['uuid'])
+ ext_network[pnet.SEGMENTATION_ID])
# Set the SNAT rule for each subnet (only first IP)
for cidr in self._find_router_subnets_cidrs(context, router_id):
cidr_prefix = int(cidr.split('/')[1])
# Reset attachment
self._update_router_port_attachment(
self.cluster, context, router_id, port_data,
+ lr_port['uuid'],
"L3GatewayAttachment",
- self.cluster.default_l3_gw_service_uuid,
- nvp_router_port_id=lr_port['uuid'])
+ self.cluster.default_l3_gw_service_uuid)
except NvpApiClient.ResourceNotFound:
raise nvp_exc.NvpPluginException(
port_data['network_id'])
# No need to actually update the DB state - the default is down
return port_data
- try:
- selected_lswitch = self._nvp_find_lswitch_for_port(context,
- port_data)
- lport = self._nvp_create_port_helper(self.cluster,
- selected_lswitch['uuid'],
- port_data,
- True)
- nicira_db.add_quantum_nvp_port_mapping(
- context.session, port_data['id'], lport['uuid'])
- nvplib.plug_l2_gw_service(
- self.cluster,
- port_data['network_id'],
- lport['uuid'],
- port_data['device_id'],
- int(port_data.get('gw:segmentation_id') or 0))
- LOG.debug(_("_nvp_create_port completed for port %(name)s "
- "on network %(network_id)s. The new port id "
- "is %(id)s."), port_data)
- except NvpApiClient.NvpApiException:
- # failed to create port in NVP delete port from quantum_db
- msg = (_("An exception occured while plugging the gateway "
- "interface into network:%s") % port_data['network_id'])
- LOG.exception(msg)
- super(NvpPluginV2, self).delete_port(context, port_data["id"])
- raise q_exc.QuantumException(message=msg)
+ selected_lswitch = self._nvp_find_lswitch_for_port(context,
+ port_data)
+ lport = self._nvp_create_port_helper(self.cluster,
+ selected_lswitch['uuid'],
+ port_data,
+ True)
+ nicira_db.add_quantum_nvp_port_mapping(
+ context.session, port_data['id'], lport['uuid'])
+ nvplib.plug_l2_gw_service(
+ self.cluster,
+ port_data['network_id'],
+ lport['uuid'],
+ port_data['device_id'],
+ int(port_data.get('gw:segmentation_id') or 0))
+ LOG.debug(_("_nvp_create_port completed for port %(name)s "
+ "on network %(network_id)s. The new port id "
+ "is %(id)s."), port_data)
def _nvp_create_fip_port(self, context, port_data):
# As we do not create ports for floating IPs in NVP,
# Find the NVP port corresponding to quantum port_id
# Do not query by nvp id as the port might be on
# an extended switch and we do not store the extended
- # swiwtch uuid
+ # switch uuid
results = nvplib.query_lswitch_lports(
self.cluster, '*',
relations='LogicalPortStatus',
def get_router(self, context, id, fields=None):
router = self._get_router(context, id)
try:
- try:
- lrouter = nvplib.get_lrouter(self.cluster, id)
- except q_exc.NotFound:
- lrouter = {}
- router_op_status = constants.NET_STATUS_ERROR
+ lrouter = nvplib.get_lrouter(self.cluster, id)
relations = lrouter.get('_relations')
if relations:
lrouter_status = relations.get('LogicalRouterStatus')
router_op_status = (lrouter_status.get('fabric_status')
and constants.NET_STATUS_ACTIVE or
constants.NET_STATUS_DOWN)
- if router_op_status != router.status:
- LOG.debug(_("Current router status:%(router_status)s;"
- "Status in Quantum DB:%(db_router_status)s"),
- {'router_status': router_op_status,
- 'db_router_status': router.status})
- # update the router status
- with context.session.begin(subtransactions=True):
- router.status = router_op_status
- except NvpApiClient.NvpApiException:
- err_msg = _("Unable to get logical router")
- LOG.exception(err_msg)
- raise nvp_exc.NvpPluginException(err_msg=err_msg)
+ except q_exc.NotFound:
+ lrouter = {}
+ router_op_status = constants.NET_STATUS_ERROR
+ if router_op_status != router.status:
+ LOG.debug(_("Current router status:%(router_status)s;"
+ "Status in Quantum DB:%(db_router_status)s"),
+ {'router_status': router_op_status,
+ 'db_router_status': router.status})
+ # update the router status
+ with context.session.begin(subtransactions=True):
+ router.status = router_op_status
return self._make_router_dict(router, fields)
def get_routers(self, context, filters=None, fields=None):
elif floatingip_db['fixed_port_id']:
# This is a disassociation.
# Remove floating IP address from logical router port
+ internal_ip = None
nvplib.update_lrouter_port_ips(self.cluster,
router_id,
nvp_gw_port_id,
LOG.exception(_("Unable to remove gateway service from "
"NVP plaform - the resource was not found"))
- def _ensure_tenant_on_net_gateway(self, context, net_gateway):
- if not net_gateway['tenant_id']:
- net_gateway['tenant_id'] = context.tenant_id
- return net_gateway
-
def get_network_gateway(self, context, id, fields=None):
# Ensure the default gateway in the config file is in sync with the db
self._ensure_default_network_gateway()
- # Ensure the tenant_id attribute is populated on the returned gateway
- #return self._ensure_tenant_on_net_gateway(
- # context, super(NvpPluginV2, self).get_network_gateway(
- # context, id, fields))
return super(NvpPluginV2, self).get_network_gateway(context,
id, fields)
return super(NvpPluginV2, self).disconnect_network(
context, network_gateway_id, network_mapping_info)
- def get_plugin_version(self):
- return PLUGIN_VERSION
-
def create_security_group(self, context, security_group, default_sg=False):
"""Create security group.
return
-def get_network_binding_by_vlanid_and_phynet(session, vlan_id,
- physical_network):
- session = session or db.get_session()
- try:
- binding = (session.query(nicira_models.NvpNetworkBinding).
- filter_by(vlan_id=vlan_id, phy_uuid=physical_network).
- one())
- return binding
- except exc.NoResultFound:
- return
-
-
def add_network_binding(session, network_id, binding_type, phy_uuid, vlan_id):
with session.begin(subtransactions=True):
binding = nicira_models.NvpNetworkBinding(network_id, binding_type,
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 Nicira Networks, Inc.
-# All Rights Reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-# This will get updated at build time. Version 0 indicates developer build.
-PLUGIN_VERSION = "0"
# @author: Aaron Rosen, Nicira Networks, Inc.
-from copy import copy
import hashlib
import inspect
import json
-import logging
from oslo.config import cfg
# no quantum-specific logic in it
from quantum.common import constants
from quantum.common import exceptions as exception
+from quantum.openstack.common import log
from quantum.plugins.nicira.common import (
exceptions as nvp_exc)
from quantum.plugins.nicira import NvpApiClient
+LOG = log.getLogger(__name__)
# HTTP METHODS CONSTANTS
HTTP_GET = "GET"
HTTP_POST = "POST"
DNAT_KEYS = ["to_dst_port", "to_dst_ip_min", "to_dst_ip_max"]
-LOCAL_LOGGING = False
-if LOCAL_LOGGING:
- from logging.handlers import SysLogHandler
- FORMAT = ("|%(levelname)s|%(filename)s|%(funcName)s|%(lineno)s"
- "|%(message)s")
- LOG = logging.getLogger(__name__)
- formatter = logging.Formatter(FORMAT)
- syslog = SysLogHandler(address="/dev/log")
- syslog.setFormatter(formatter)
- LOG.addHandler(syslog)
- LOG.setLevel(logging.DEBUG)
-else:
- LOG = logging.getLogger(__name__)
- LOG.setLevel(logging.DEBUG)
-
# TODO(bgh): it would be more efficient to use a bitmap
taken_context_ids = []
-_net_type_cache = {} # cache of {net_id: network_type}
# XXX Only cache default for now
_lqueue_cache = {}
"""Return major/minor version #."""
# Get control-cluster nodes
uri = "/ws.v1/control-cluster/node?_page_length=1&fields=uuid"
- try:
- res = do_single_request(HTTP_GET, uri, cluster=cluster)
- res = json.loads(res)
- except NvpApiClient.NvpApiException:
- raise exception.QuantumException()
+ res = do_request(HTTP_GET, uri, cluster=cluster)
if res["result_count"] == 0:
return None
node_uuid = res["results"][0]["uuid"]
# Get control-cluster node status. It's unsupported to have controllers
# running different version so we just need the first node version.
uri = "/ws.v1/control-cluster/node/%s/status" % node_uuid
- try:
- res = do_single_request(HTTP_GET, uri, cluster=cluster)
- res = json.loads(res)
- except NvpApiClient.NvpApiException:
- raise exception.QuantumException()
+ res = do_request(HTTP_GET, uri, cluster=cluster)
version_parts = res["version"].split(".")
version = "%s.%s" % tuple(version_parts[:2])
LOG.info(_("NVP controller cluster version: %s"), version)
while need_more_results:
page_cursor_str = (
"_page_cursor=%s" % page_cursor if page_cursor else "")
- res = do_single_request(HTTP_GET, "%s%s%s" %
- (path, query_marker, page_cursor_str),
- cluster=c)
- body = json.loads(res)
+ body = do_request(HTTP_GET,
+ "%s%s%s" % (path, query_marker, page_cursor_str),
+ cluster=c)
page_cursor = body.get('page_cursor')
if not page_cursor:
need_more_results = False
return result_list
-def do_single_request(*args, **kwargs):
- """Issue a request to a specified cluster if specified via kwargs
- (cluster=<cluster>).
- """
- cluster = kwargs["cluster"]
- try:
- req = cluster.api_client.request(*args)
- except NvpApiClient.ResourceNotFound:
- raise exception.NotFound()
- return req
-
-
-def do_multi_request(*args, **kwargs):
- """Issue a request to all clusters."""
- results = []
- clusters = kwargs["clusters"]
- for x in clusters:
- LOG.debug(_("Issuing request to cluster: %s"), x.name)
- rv = x.api_client.request(*args)
- results.append(rv)
- return results
-
-
# -------------------------------------------------------------------
# Network functions
# -------------------------------------------------------------------
-def find_port_and_cluster(clusters, port_id):
- """Find port and cluster.
-
- Returns (url, cluster_id) of port or (None, None) if port does not exist.
- """
- for c in clusters:
- query = "/ws.v1/lswitch/*/lport?uuid=%s&fields=*" % port_id
- LOG.debug(_("Looking for lswitch with port id "
- "'%(port_id)s' on: %(c)s"),
- {'port_id': port_id, 'c': c})
- try:
- res = do_single_request(HTTP_GET, query, cluster=c)
- except Exception as e:
- LOG.error(_("get_port_cluster_and_url, exception: %s"), str(e))
- continue
- res = json.loads(res)
- if len(res["results"]) == 1:
- return (res["results"][0], c)
- return (None, None)
-
-
-def find_lswitch_by_portid(clusters, port_id):
- port, cluster = find_port_and_cluster(clusters, port_id)
- if port and cluster:
- href = port["_href"].split('/')
- return (href[3], cluster)
- return (None, None)
-
-
def get_lswitches(cluster, quantum_net_id):
lswitch_uri_path = _build_uri_path(LSWITCH_RESOURCE, quantum_net_id,
relations="LogicalSwitchStatus")
results = []
try:
- resp_obj = do_single_request(HTTP_GET,
- lswitch_uri_path,
- cluster=cluster)
- ls = json.loads(resp_obj)
+ ls = do_request(HTTP_GET, lswitch_uri_path, cluster=cluster)
results.append(ls)
for tag in ls['tags']:
if (tag['scope'] == "multi_lswitch" and
cluster)
results.extend(extra_switches)
return results
- except NvpApiClient.ResourceNotFound:
+ except exception.NotFound:
raise exception.NetworkNotFound(net_id=quantum_net_id)
- except NvpApiClient.NvpApiException:
- # TODO(salvatore-olrando): Do a better exception handling
- # and re-raising
- LOG.exception(_("An error occured while fetching logical switches "
- "for Quantum network %s"), quantum_net_id)
- raise exception.QuantumException()
def create_lswitch(cluster, tenant_id, display_name,
if "tags" in kwargs:
lswitch_obj["tags"].extend(kwargs["tags"])
uri = _build_uri_path(LSWITCH_RESOURCE)
- try:
- lswitch_res = do_single_request(HTTP_POST, uri,
- json.dumps(lswitch_obj),
- cluster=cluster)
- except NvpApiClient.NvpApiException:
- raise exception.QuantumException()
- lswitch = json.loads(lswitch_res)
+ lswitch = do_request(HTTP_POST, uri, json.dumps(lswitch_obj),
+ cluster=cluster)
LOG.debug(_("Created logical switch: %s"), lswitch['uuid'])
return lswitch
if "tags" in kwargs:
lswitch_obj["tags"].extend(kwargs["tags"])
try:
- resp_obj = do_single_request(HTTP_PUT, uri, json.dumps(lswitch_obj),
- cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
+ return do_request(HTTP_PUT, uri, json.dumps(lswitch_obj),
+ cluster=cluster)
+ except exception.NotFound as e:
LOG.error(_("Network not found, Error: %s"), str(e))
raise exception.NetworkNotFound(net_id=lswitch_id)
- except NvpApiClient.NvpApiException as e:
- raise exception.QuantumException()
-
- obj = json.loads(resp_obj)
- return obj
def create_l2_gw_service(cluster, tenant_id, display_name, devices):
"gateways": gateways,
"type": "L2GatewayServiceConfig"
}
- try:
- return json.loads(do_single_request(
- "POST", _build_uri_path(GWSERVICE_RESOURCE),
- json.dumps(gwservice_obj), cluster=cluster))
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to create L2 Gateway Service "
- "%(name)s for tenant %(id)s.")
- % {'name': display_name, 'id': tenant_id})
- raise
+ return do_request(
+ "POST", _build_uri_path(GWSERVICE_RESOURCE),
+ json.dumps(gwservice_obj), cluster=cluster)
def create_lrouter(cluster, tenant_id, display_name, nexthop):
},
"type": "LogicalRouterConfig"
}
- try:
- return json.loads(do_single_request(HTTP_POST,
- _build_uri_path(LROUTER_RESOURCE),
- json.dumps(lrouter_obj),
- cluster=cluster))
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to create Router "
- "%(name)s for tenant %(id)s.")
- % {'name': display_name, 'id': tenant_id})
- raise
+ return do_request(HTTP_POST, _build_uri_path(LROUTER_RESOURCE),
+ json.dumps(lrouter_obj), cluster=cluster)
def delete_lrouter(cluster, lrouter_id):
- try:
- do_single_request(HTTP_DELETE,
- _build_uri_path(LROUTER_RESOURCE,
- resource_id=lrouter_id),
- cluster=cluster)
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to delete Router "
- "Service %s."), lrouter_id)
- raise
+ do_request(HTTP_DELETE, _build_uri_path(LROUTER_RESOURCE,
+ resource_id=lrouter_id),
+ cluster=cluster)
def delete_l2_gw_service(cluster, gateway_id):
- try:
- do_single_request("DELETE",
- _build_uri_path(GWSERVICE_RESOURCE,
- resource_id=gateway_id),
- cluster=cluster)
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to delete L2 Gateway "
- "Service %s."), gateway_id)
- raise
+ do_request("DELETE", _build_uri_path(GWSERVICE_RESOURCE,
+ resource_id=gateway_id),
+ cluster=cluster)
def get_lrouter(cluster, lrouter_id):
- try:
- return json.loads(do_single_request(HTTP_GET,
- _build_uri_path(LROUTER_RESOURCE,
- resource_id=lrouter_id,
- relations='LogicalRouterStatus'),
- cluster=cluster))
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to get Router "
- "Service %s."), lrouter_id)
- raise
+ return do_request(HTTP_GET,
+ _build_uri_path(LROUTER_RESOURCE,
+ resource_id=lrouter_id,
+ relations='LogicalRouterStatus'),
+ cluster=cluster)
def get_l2_gw_service(cluster, gateway_id):
- try:
- return json.loads(do_single_request("GET",
- _build_uri_path(GWSERVICE_RESOURCE,
- resource_id=gateway_id),
- cluster=cluster))
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to get L2 Gateway "
- "Service %s."), gateway_id)
- raise
+ return do_request(
+ "GET", _build_uri_path(GWSERVICE_RESOURCE,
+ resource_id=gateway_id),
+ cluster=cluster)
def get_lrouters(cluster, tenant_id, fields=None, filters=None):
# Nothing to update
return gwservice_obj
gwservice_obj["display_name"] = _check_and_truncate_name(display_name)
- try:
- return json.loads(do_single_request("PUT",
- _build_uri_path(GWSERVICE_RESOURCE,
- resource_id=gateway_id),
- json.dumps(gwservice_obj),
- cluster=cluster))
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to update L2 Gateway Service "
- "%(id)s with name %(name)s.") %
- {'id': gateway_id, 'name': display_name})
- raise
+ return do_request("PUT", _build_uri_path(GWSERVICE_RESOURCE,
+ resource_id=gateway_id),
+ json.dumps(gwservice_obj), cluster=cluster)
def update_lrouter(cluster, lrouter_id, display_name, nexthop):
"default_route_next_hop")
if nh_element:
nh_element["gateway_ip_address"] = nexthop
- try:
- return json.loads(do_single_request(HTTP_PUT,
- _build_uri_path(LROUTER_RESOURCE,
- resource_id=lrouter_id),
- json.dumps(lrouter_obj),
- cluster=cluster))
- except NvpApiClient.NvpApiException:
- # just log and re-raise - let the caller handle it
- LOG.exception(_("Unable to update Router "
- "%(id)s with name %(name)s.") %
- {'id': lrouter_id, 'name': display_name})
- raise
-
-
-def get_all_networks(cluster, tenant_id, networks):
- """Get all networks.
-
- Append the quantum network uuids we can find in the given cluster
- to "networks".
- """
- uri = "/ws.v1/lswitch?fields=*&tag=%s&tag_scope=os_tid" % tenant_id
- try:
- resp_obj = do_single_request(HTTP_GET, uri, cluster=cluster)
- except NvpApiClient.NvpApiException:
- raise exception.QuantumException()
- if not resp_obj:
- return []
- networks_result = copy(networks)
- return networks_result
-
-
-def query_networks(cluster, tenant_id, fields="*", tags=None):
- uri = "/ws.v1/lswitch?fields=%s" % fields
- if tags:
- for t in tags:
- uri += "&tag=%s&tag_scope=%s" % (t[0], t[1])
- try:
- resp_obj = do_single_request(HTTP_GET, uri, cluster=cluster)
- except NvpApiClient.NvpApiException:
- raise exception.QuantumException()
- if not resp_obj:
- return []
- lswitches = json.loads(resp_obj)["results"]
- nets = [{'net-id': lswitch["uuid"], 'net-name': lswitch["display_name"]}
- for lswitch in lswitches]
- return nets
+ return do_request(HTTP_PUT, _build_uri_path(LROUTER_RESOURCE,
+ resource_id=lrouter_id),
+ json.dumps(lrouter_obj),
+ cluster=cluster)
def delete_network(cluster, net_id, lswitch_id):
delete_networks(cluster, net_id, [lswitch_id])
+#TODO(salvatore-orlando): Simplify and harmonize
def delete_networks(cluster, net_id, lswitch_ids):
- if net_id in _net_type_cache:
- del _net_type_cache[net_id]
for ls_id in lswitch_ids:
path = "/ws.v1/lswitch/%s" % ls_id
-
try:
- do_single_request(HTTP_DELETE, path, cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
+ do_request(HTTP_DELETE, path, cluster=cluster)
+ except exception.NotFound as e:
LOG.error(_("Network not found, Error: %s"), str(e))
raise exception.NetworkNotFound(net_id=ls_id)
- except NvpApiClient.NvpApiException as e:
- raise exception.QuantumException()
def query_lswitch_lports(cluster, ls_uuid, fields="*",
del filters['attachment']
uri = _build_uri_path(LSWITCHPORT_RESOURCE, parent_resource_id=ls_uuid,
fields=fields, filters=filters, relations=relations)
- try:
- resp_obj = do_single_request(HTTP_GET, uri, cluster=cluster)
- except NvpApiClient.ResourceNotFound:
- LOG.exception(_("Logical switch: %s not found"), ls_uuid)
- raise
- except NvpApiClient.NvpApiException:
- LOG.exception(_("An error occurred while querying logical ports on "
- "the NVP platform"))
- raise
- return json.loads(resp_obj)["results"]
+ return do_request(HTTP_GET, uri, cluster=cluster)['results']
def query_lrouter_lports(cluster, lr_uuid, fields="*",
filters=None, relations=None):
uri = _build_uri_path(LROUTERPORT_RESOURCE, parent_resource_id=lr_uuid,
fields=fields, filters=filters, relations=relations)
- try:
- resp_obj = do_single_request(HTTP_GET, uri, cluster=cluster)
- except NvpApiClient.ResourceNotFound:
- LOG.exception(_("Logical router: %s not found"), lr_uuid)
- raise
- except NvpApiClient.NvpApiException:
- LOG.exception(_("An error occured while querying logical router "
- "ports on the NVP platfom"))
- raise
- return json.loads(resp_obj)["results"]
+ return do_request(HTTP_GET, uri, cluster=cluster)['results']
def delete_port(cluster, switch, port):
uri = "/ws.v1/lswitch/" + switch + "/lport/" + port
try:
- do_single_request(HTTP_DELETE, uri, cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
- LOG.error(_("Port or Network not found, Error: %s"), str(e))
- raise exception.PortNotFound(port_id=port['uuid'])
- except NvpApiClient.NvpApiException as e:
+ do_request(HTTP_DELETE, uri, cluster=cluster)
+ except exception.NotFound:
+ LOG.exception(_("Port or Network not found"))
+ raise exception.PortNotFound(net_id=switch,
+ port_id=port)
+ except NvpApiClient.NvpApiException:
raise exception.QuantumException()
-def get_port_by_display_name(clusters, lswitch, display_name):
- """Return (url, cluster_id) of port or raises PortNotFound."""
- query = ("/ws.v1/lswitch/%s/lport?display_name=%s&fields=*" %
- (lswitch, display_name))
- LOG.debug(_("Looking for port with display_name "
- "'%(display_name)s' on: %(lswitch)s"),
- {'display_name': display_name, 'lswitch': lswitch})
- for c in clusters:
- try:
- res_obj = do_single_request(HTTP_GET, query, cluster=c)
- except Exception as e:
- continue
- res = json.loads(res_obj)
- if len(res["results"]) == 1:
- return (res["results"][0], c)
-
- LOG.error(_("Port or Network not found, Error: %s"), str(e))
- raise exception.PortNotFound(port_id=display_name, net_id=lswitch)
-
-
def get_port_by_quantum_tag(cluster, lswitch_uuid, quantum_port_id):
"""Get port by quantum tag.
"on: '%(lswitch_uuid)s'") %
{'quantum_port_id': quantum_port_id,
'lswitch_uuid': lswitch_uuid})
- try:
- res_obj = do_single_request(HTTP_GET, uri, cluster=cluster)
- except Exception:
- LOG.exception(_("An exception occurred while querying NVP ports"))
- raise
- res = json.loads(res_obj)
+ res = do_request(HTTP_GET, uri, cluster=cluster)
num_results = len(res["results"])
if num_results >= 1:
if num_results > 1:
if relations:
uri += "relations=%s" % relations
try:
- resp_obj = do_single_request(HTTP_GET, uri, cluster=cluster)
- port = json.loads(resp_obj)
- except NvpApiClient.ResourceNotFound as e:
+ return do_request(HTTP_GET, uri, cluster=cluster)
+ except exception.NotFound as e:
LOG.error(_("Port or Network not found, Error: %s"), str(e))
raise exception.PortNotFound(port_id=port, net_id=network)
- except NvpApiClient.NvpApiException as e:
- raise exception.QuantumException()
- return port
def _configure_extensions(lport_obj, mac_address, fixed_ips,
path = "/ws.v1/lswitch/" + lswitch_uuid + "/lport/" + lport_uuid
try:
- resp_obj = do_single_request(HTTP_PUT, path, json.dumps(lport_obj),
- cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
+ result = do_request(HTTP_PUT, path, json.dumps(lport_obj),
+ cluster=cluster)
+ LOG.debug(_("Updated logical port %(result)s "
+ "on logical switch %(uuid)s"),
+ {'result': result['uuid'], 'uuid': lswitch_uuid})
+ return result
+ except exception.NotFound as e:
LOG.error(_("Port or Network not found, Error: %s"), str(e))
raise exception.PortNotFound(port_id=lport_uuid, net_id=lswitch_uuid)
- except NvpApiClient.NvpApiException as e:
- raise exception.QuantumException()
- result = json.loads(resp_obj)
- LOG.debug(_("Updated logical port %(result)s on logical swtich %(uuid)s"),
- {'result': result['uuid'], 'uuid': lswitch_uuid})
- return result
def create_lport(cluster, lswitch_uuid, tenant_id, quantum_port_id,
path = _build_uri_path(LSWITCHPORT_RESOURCE,
parent_resource_id=lswitch_uuid)
- try:
- resp_obj = do_single_request(HTTP_POST, path,
- json.dumps(lport_obj),
- cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
- LOG.error(_("Logical switch not found, Error: %s"), str(e))
- raise
-
- result = json.loads(resp_obj)
+ result = do_request(HTTP_POST, path, json.dumps(lport_obj),
+ cluster=cluster)
+
LOG.debug(_("Created logical port %(result)s on logical swtich %(uuid)s"),
{'result': result['uuid'], 'uuid': lswitch_uuid})
return result
)
path = _build_uri_path(LROUTERPORT_RESOURCE,
parent_resource_id=lrouter_uuid)
- try:
- resp_obj = do_single_request(HTTP_POST, path,
- json.dumps(lport_obj),
- cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
- LOG.error(_("Logical router not found, Error: %s"), str(e))
- raise
-
- result = json.loads(resp_obj)
+ result = do_request(HTTP_POST, path, json.dumps(lport_obj),
+ cluster=cluster)
+
LOG.debug(_("Created logical port %(lport_uuid)s on "
"logical router %(lrouter_uuid)s"),
{'lport_uuid': result['uuid'],
path = _build_uri_path(LROUTERPORT_RESOURCE,
lrouter_port_uuid,
parent_resource_id=lrouter_uuid)
- try:
- resp_obj = do_single_request(HTTP_PUT, path,
- json.dumps(lport_obj),
- cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
- LOG.error(_("Logical router or router port not found, "
- "Error: %s"), str(e))
- raise
-
- result = json.loads(resp_obj)
+ result = do_request(HTTP_PUT, path,
+ json.dumps(lport_obj),
+ cluster=cluster)
LOG.debug(_("Updated logical port %(lport_uuid)s on "
"logical router %(lrouter_uuid)s"),
{'lport_uuid': lrouter_port_uuid, 'lrouter_uuid': lrouter_uuid})
def delete_router_lport(cluster, lrouter_uuid, lport_uuid):
"""Creates a logical port on the assigned logical router."""
path = _build_uri_path(LROUTERPORT_RESOURCE, lport_uuid, lrouter_uuid)
- try:
- do_single_request(HTTP_DELETE, path, cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
- LOG.error(_("Logical router not found, Error: %s"), str(e))
- raise
+ do_request(HTTP_DELETE, path, cluster=cluster)
LOG.debug(_("Delete logical router port %(lport_uuid)s on "
"logical router %(lrouter_uuid)s"),
{'lport_uuid': lport_uuid,
def delete_peer_router_lport(cluster, lr_uuid, ls_uuid, lp_uuid):
nvp_port = get_port(cluster, ls_uuid, lp_uuid,
relations="LogicalPortAttachment")
- try:
- relations = nvp_port.get('_relations')
- if relations:
- att_data = relations.get('LogicalPortAttachment')
- if att_data:
- lrp_uuid = att_data.get('peer_port_uuid')
- if lrp_uuid:
- delete_router_lport(cluster, lr_uuid, lrp_uuid)
- except (NvpApiClient.NvpApiException, NvpApiClient.ResourceNotFound):
- LOG.exception(_("Unable to fetch and delete peer logical "
- "router port for logical switch port:%s"),
- lp_uuid)
- raise
+ relations = nvp_port.get('_relations')
+ if relations:
+ att_data = relations.get('LogicalPortAttachment')
+ if att_data:
+ lrp_uuid = att_data.get('peer_port_uuid')
+ if lrp_uuid:
+ delete_router_lport(cluster, lr_uuid, lrp_uuid)
def find_router_gw_port(context, cluster, router_id):
if attachment_vlan:
attach_obj['vlan_id'] = attachment_vlan
else:
+ # TODO(salv-orlando): avoid raising generic exception
raise Exception(_("Invalid NVP attachment type '%s'"),
nvp_attachment_type)
- try:
- resp_obj = do_single_request(
- HTTP_PUT, uri, json.dumps(attach_obj), cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
- LOG.exception(_("Router Port not found, Error: %s"), str(e))
- raise
- except NvpApiClient.Conflict as e:
- LOG.exception(_("Conflict while setting router port attachment"))
- raise
- except NvpApiClient.NvpApiException as e:
- LOG.exception(_("Unable to plug attachment into logical router port"))
- raise
- result = json.loads(resp_obj)
- return result
+ return do_request(HTTP_PUT, uri, json.dumps(attach_obj), cluster=cluster)
def get_port_status(cluster, lswitch_id, port_id):
"""Retrieve the operational status of the port."""
try:
- r = do_single_request(HTTP_GET,
- "/ws.v1/lswitch/%s/lport/%s/status" %
- (lswitch_id, port_id), cluster=cluster)
- r = json.loads(r)
- except NvpApiClient.ResourceNotFound as e:
+ r = do_request(HTTP_GET,
+ "/ws.v1/lswitch/%s/lport/%s/status" %
+ (lswitch_id, port_id), cluster=cluster)
+ except exception.NotFound as e:
LOG.error(_("Port not found, Error: %s"), str(e))
raise exception.PortNotFound(port_id=port_id, net_id=lswitch_id)
- except NvpApiClient.NvpApiException as e:
- raise exception.QuantumException()
if r['link_status_up'] is True:
return constants.PORT_STATUS_ACTIVE
else:
def _plug_interface(cluster, lswitch_id, lport_id, att_obj):
uri = _build_uri_path(LSWITCHPORT_RESOURCE, lport_id, lswitch_id,
is_attachment=True)
- try:
- resp_obj = do_single_request(HTTP_PUT, uri, json.dumps(att_obj),
- cluster=cluster)
- except NvpApiClient.NvpApiException:
- LOG.exception(_("Exception while plugging an attachment:%(att)s "
- "into NVP port:%(port)s for NVP logical switch "
- "%(net)s"), {'net': lswitch_id,
- 'port': lport_id,
- 'att': att_obj})
- raise
-
- result = json.dumps(resp_obj)
- return result
+ return do_request(HTTP_PUT, uri, json.dumps(att_obj),
+ cluster=cluster)
def plug_l2_gw_service(cluster, lswitch_id, lport_id,
TENANT_ID_SCOPE = 'os_tid'
-def format_exception(etype, e, execption_locals, request=None):
+def format_exception(etype, e, exception_locals):
"""Consistent formatting for exceptions.
:param etype: a string describing the exception type.
:param e: the exception.
- :param request: the request object.
:param execption_locals: calling context local variable dict.
:returns: a formatted string.
"""
msg = ["Error. %s exception: %s." % (etype, e)]
- if request:
- msg.append("request=[%s]" % request)
- if request.body:
- msg.append("request.body=[%s]" % str(request.body))
- l = dict((k, v) for k, v in execption_locals if k != 'request')
+ l = dict((k, v) for k, v in exception_locals.iteritems()
+ if k != 'request')
msg.append("locals=[%s]" % str(l))
return ' '.join(msg)
def do_request(*args, **kwargs):
- """Convenience function wraps do_single_request.
+ """Issue a request to the cluster specified in kwargs.
:param args: a list of positional arguments.
:param kwargs: a list of keyworkds arguments.
- :returns: the result of do_single_request loaded into a python object
- or None.
+ :returns: the result of the operation loaded into a python
+ object or None.
"""
- res = do_single_request(*args, **kwargs)
- if res:
- return json.loads(res)
- return res
+ cluster = kwargs["cluster"]
+ try:
+ res = cluster.api_client.request(*args)
+ if res:
+ return json.loads(res)
+ except NvpApiClient.ResourceNotFound:
+ raise exception.NotFound()
def mk_body(**kwargs):
{'ethertype': 'IPv6'}]}
tags = [dict(scope='os_tid', tag=tenant_id),
dict(scope='quantum', tag=QUANTUM_VERSION)]
- try:
- display_name = _check_and_truncate_name(security_profile.get('name'))
- body = mk_body(
- tags=tags, display_name=display_name,
- logical_port_ingress_rules=(
- hidden_rules['logical_port_ingress_rules']),
- logical_port_egress_rules=hidden_rules['logical_port_egress_rules']
- )
- rsp = do_request(HTTP_POST, path, body, cluster=cluster)
- except NvpApiClient.NvpApiException as e:
- LOG.error(format_exception("Unknown", e, locals()))
- raise exception.QuantumException()
+ display_name = _check_and_truncate_name(security_profile.get('name'))
+ body = mk_body(
+ tags=tags, display_name=display_name,
+ logical_port_ingress_rules=(
+ hidden_rules['logical_port_ingress_rules']),
+ logical_port_egress_rules=hidden_rules['logical_port_egress_rules']
+ )
+ rsp = do_request(HTTP_POST, path, body, cluster=cluster)
if security_profile.get('name') == 'default':
# If security group is default allow ip traffic between
# members of the same security profile is allowed and ingress traffic
logical_port_ingress_rules=rules['logical_port_ingress_rules'],
logical_port_egress_rules=rules['logical_port_egress_rules'])
rsp = do_request(HTTP_PUT, path, body, cluster=cluster)
- except NvpApiClient.NvpApiException as e:
+ except exception.NotFound as e:
LOG.error(format_exception("Unknown", e, locals()))
+ #FIXME(salvatore-orlando): This should not raise QuantumException
raise exception.QuantumException()
LOG.debug(_("Updated Security Profile: %s"), rsp)
return rsp
try:
do_request(HTTP_DELETE, path, cluster=cluster)
- except NvpApiClient.NvpApiException as e:
+ except exception.NotFound as e:
+ # FIXME(salv-orlando): should not raise QuantumException
LOG.error(format_exception("Unknown", e, locals()))
raise exception.QuantumException()
def _create_lrouter_nat_rule(cluster, router_id, nat_rule_obj):
LOG.debug(_("Creating NAT rule: %s"), nat_rule_obj)
uri = _build_uri_path(LROUTERNAT_RESOURCE, parent_resource_id=router_id)
- try:
- resp = do_single_request(HTTP_POST, uri, json.dumps(nat_rule_obj),
- cluster=cluster)
- except NvpApiClient.ResourceNotFound:
- LOG.exception(_("NVP Logical Router %s not found"), router_id)
- raise
- except NvpApiClient.NvpApiException:
- LOG.exception(_("An error occurred while creating the NAT rule "
- "on the NVP platform"))
- raise
- rule = json.loads(resp)
- return rule
+ return do_request(HTTP_POST, uri, json.dumps(nat_rule_obj),
+ cluster=cluster)
def _build_snat_rule_obj(min_src_ip, max_src_ip, nat_match_obj):
def delete_router_nat_rule(cluster, router_id, rule_id):
uri = _build_uri_path(LROUTERNAT_RESOURCE, rule_id, router_id)
- try:
- do_single_request(HTTP_DELETE, uri, cluster=cluster)
- except NvpApiClient.NvpApiException:
- LOG.exception(_("An error occurred while removing NAT rule "
- "'%(nat_rule_uuid)s' for logical "
- "router '%(lrouter_uuid)s'"),
- {'nat_rule_uuid': rule_id, 'lrouter_uuid': router_id})
- raise
-
-
-def get_router_nat_rule(cluster, tenant_id, router_id, rule_id):
- uri = _build_uri_path(LROUTERNAT_RESOURCE, rule_id, router_id)
- try:
- resp = do_single_request(HTTP_GET, uri, cluster=cluster)
- except NvpApiClient.ResourceNotFound:
- LOG.exception(_("NAT rule %s not found"), rule_id)
- raise
- except NvpApiClient.NvpApiException:
- LOG.exception(_("An error occured while retrieving NAT rule '%s'"
- "from NVP platform"), rule_id)
- raise
- res = json.loads(resp)
- return res
+ do_request(HTTP_DELETE, uri, cluster=cluster)
def query_nat_rules(cluster, router_id, fields="*", filters=None):
uri = _build_uri_path(LROUTERNAT_RESOURCE, parent_resource_id=router_id,
fields=fields, filters=filters)
- try:
- result = get_all_query_pages(uri, cluster)
- except NvpApiClient.ResourceNotFound:
- LOG.exception(_("NVP Logical Router '%s' not found"), router_id)
- raise
- except NvpApiClient.NvpApiException:
- LOG.exception(_("An error occured while retrieving NAT rules for "
- "NVP logical router '%s'"), router_id)
- raise
- return result
+ return get_all_query_pages(uri, cluster)
# NOTE(salvatore-orlando): The following FIXME applies in general to
ips_to_add, ips_to_remove):
uri = _build_uri_path(LROUTERPORT_RESOURCE, lport_id, lrouter_id)
try:
- port = json.loads(do_single_request(HTTP_GET, uri, cluster=cluster))
+ port = do_request(HTTP_GET, uri, cluster=cluster)
# TODO(salvatore-orlando): Enforce ips_to_add intersection with
# ips_to_remove is empty
ip_address_set = set(port['ip_addresses'])
ip_address_set = ip_address_set | set(ips_to_add)
# Set is not JSON serializable - convert to list
port['ip_addresses'] = list(ip_address_set)
- do_single_request(HTTP_PUT, uri, json.dumps(port), cluster=cluster)
- except NvpApiClient.ResourceNotFound as e:
+ do_request(HTTP_PUT, uri, json.dumps(port), cluster=cluster)
+ except exception.NotFound as e:
+ # FIXME(salv-orlando):avoid raising different exception
data = {'lport_id': lport_id, 'lrouter_id': lrouter_id}
msg = (_("Router Port %(lport_id)s not found on router "
"%(lrouter_id)s") % data)
uri = _build_uri_path(LQUEUE_RESOURCE)
lqueue['tags'] = [{'tag': QUANTUM_VERSION, 'scope': 'quantum'}]
try:
- resp_obj = do_single_request(HTTP_POST, uri, json.dumps(lqueue),
- cluster=cluster)
+ return do_request(HTTP_POST, uri, json.dumps(lqueue),
+ cluster=cluster)['uuid']
except NvpApiClient.NvpApiException:
+ # FIXME(salv-orlando): This should not raise QauntumException
LOG.exception(_("Failed to create logical queue"))
raise exception.QuantumException()
- return json.loads(resp_obj)['uuid']
def delete_lqueue(cluster, id):
try:
- do_single_request(HTTP_DELETE,
- _build_uri_path(LQUEUE_RESOURCE,
- resource_id=id),
- cluster=cluster)
+ do_request(HTTP_DELETE, _build_uri_path(LQUEUE_RESOURCE,
+ resource_id=id),
+ cluster=cluster)
except Exception:
+ # FIXME(salv-orlando): This should not raise QauntumException
LOG.exception(_("Failed to delete logical queue"))
raise exception.QuantumException()
# -----------------------------------------------------------------------------
def config_helper(http_method, http_uri, cluster):
try:
- resp = do_single_request(http_method,
- http_uri,
- cluster=cluster)
+ return do_request(http_method,
+ http_uri,
+ cluster=cluster)
except Exception as e:
msg = ("Error '%s' when connecting to controller(s): %s."
% (str(e), ', '.join(cluster.nvp_controllers)))
raise Exception(msg)
- return json.loads(resp)
def check_cluster_connectivity(cluster):
--- /dev/null
+{
+ "display_name": "%(display_name)s",
+ "uuid": "%(uuid)s",
+ "type": "LogicalSwitchConfig",
+ "_schema": "/ws.v1/schema/LogicalQueueConfig",
+ "dscp": "%(dscp)s",
+ "max_bandwidth_rate": "%(max_bandwidth_rate)s",
+ "min_bandwidth_rate": "%(min_bandwidth_rate)s",
+ "qos_marking": "%(qos_marking)s",
+ "_href": "/ws.v1/lqueue/%(uuid)s"
+}
{
"display_name": "%(display_name)s",
+ "admin_status_enabled": "%(admin_status_enabled)s",
"_href": "/ws.v1/lrouter/%(lr_uuid)s/lport/%(uuid)s",
"tags":
[{"scope": "q_port_id", "tag": "%(quantum_port_id)s"},
-{"display_name": "%(uuid)s",
+{"display_name": "%(display_name)s",
"_relations":
{"LogicalPortStatus":
{"type": "LogicalSwitchPortStatus",
{"scope": "vm_id", "tag": "%(quantum_device_id)s"},
{"scope": "os_tid", "tag": "%(tenant_id)s"}],
"uuid": "%(uuid)s",
- "admin_status_enabled": true,
+ "admin_status_enabled": "%(admin_status_enabled)s",
"type": "LogicalSwitchPortConfig",
"_schema": "/ws.v1/schema/LogicalSwitchPortConfig",
"_href": "/ws.v1/lswitch/%(ls_uuid)s/lport/%(uuid)s"
"link_status_up": false,
"_schema": "/ws.v1/schema/LogicalSwitchPortStatus",
"admin_status_enabled": true,
- "fabric_status_up": false,
+ "fabric_status_up": true,
+ "link_status_up": true,
"type": "LogicalSwitchPortStatus"
}
--- /dev/null
+{
+ "display_name": "%(display_name)s",
+ "_href": "/ws.v1/security-profile/%(uuid)s",
+ "tags": [{"scope": "os_tid", "tag": "%(tenant_id)s"},
+ {"scope": "nova_spid", "tag": "%(nova_spid)s"}],
+ "logical_port_egress_rules": %(logical_port_egress_rules_json)s,
+ "_schema": "/ws.v1/schema/SecurityProfileConfig",
+ "logical_port_ingress_rules": %(logical_port_ingress_rules_json)s,
+ "uuid": "%(uuid)s"
+}
{
%(peer_port_href_field)s
%(peer_port_uuid_field)s
+ %(l3_gateway_service_uuid_field)s
+ %(vlan_id_field)s
"_href": "/ws.v1/lrouter/%(lr_uuid)s/lport/%(lp_uuid)s/attachment",
"type": "%(type)s",
"schema": "/ws.v1/schema/%(type)s"
LROUTER_LPORT_ATT: "fake_get_lrouter_lport_att.json",
LROUTER_STATUS: "fake_get_lrouter_status.json",
LROUTER_NAT_RESOURCE: "fake_get_lrouter_nat.json",
+ SECPROF_RESOURCE: "fake_get_security_profile.json",
+ LQUEUE_RESOURCE: "fake_get_lqueue.json",
GWSERVICE_RESOURCE: "fake_get_gwservice.json"
}
fake_lswitch['lport_count'] = 0
return fake_lswitch
- def _add_lrouter(self, body):
+ def _build_lrouter(self, body, uuid=None):
fake_lrouter = json.loads(body)
- fake_lrouter['uuid'] = uuidutils.generate_uuid()
- self._fake_lrouter_dict[fake_lrouter['uuid']] = fake_lrouter
+ if uuid:
+ fake_lrouter['uuid'] = uuid
fake_lrouter['tenant_id'] = self._get_tag(fake_lrouter, 'os_tid')
- fake_lrouter['lport_count'] = 0
default_nexthop = fake_lrouter['routing_config'].get(
'default_route_next_hop')
fake_lrouter['default_next_hop'] = default_nexthop.get(
'gateway_ip_address', '0.0.0.0')
return fake_lrouter
+ def _add_lrouter(self, body):
+ fake_lrouter = self._build_lrouter(body,
+ uuidutils.generate_uuid())
+ self._fake_lrouter_dict[fake_lrouter['uuid']] = fake_lrouter
+ fake_lrouter['lport_count'] = 0
+ return fake_lrouter
+
def _add_lqueue(self, body):
fake_lqueue = json.loads(body)
fake_lqueue['uuid'] = uuidutils.generate_uuid()
self._fake_lswitch_lportstatus_dict[new_uuid] = fake_lport_status
return fake_lport
- def _add_lrouter_lport(self, body, lr_uuid):
+ def _build_lrouter_lport(self, body, new_uuid=None, lr_uuid=None):
fake_lport = json.loads(body)
- new_uuid = uuidutils.generate_uuid()
- fake_lport['uuid'] = new_uuid
- # put the tenant_id and the ls_uuid in the main dict
+ if new_uuid:
+ fake_lport['uuid'] = new_uuid
+ # put the tenant_id and the le_uuid in the main dict
# for simplyfying templating
- fake_lport['lr_uuid'] = lr_uuid
+ if lr_uuid:
+ fake_lport['lr_uuid'] = lr_uuid
fake_lport['tenant_id'] = self._get_tag(fake_lport, 'os_tid')
fake_lport['quantum_port_id'] = self._get_tag(fake_lport,
'q_port_id')
if 'ip_addresses' in fake_lport:
ip_addresses_json = json.dumps(fake_lport['ip_addresses'])
fake_lport['ip_addresses_json'] = ip_addresses_json
+ return fake_lport
+
+ def _add_lrouter_lport(self, body, lr_uuid):
+ new_uuid = uuidutils.generate_uuid()
+ fake_lport = self._build_lrouter_lport(body, new_uuid, lr_uuid)
self._fake_lrouter_lport_dict[fake_lport['uuid']] = fake_lport
- fake_lrouter = self._fake_lrouter_dict[lr_uuid]
+ try:
+ fake_lrouter = self._fake_lrouter_dict[lr_uuid]
+ except KeyError:
+ raise NvpApiClient.ResourceNotFound()
fake_lrouter['lport_count'] += 1
fake_lport_status = fake_lport.copy()
fake_lport_status['lr_tenant_id'] = fake_lrouter['tenant_id']
for item in res_dict.itervalues():
if 'tags' in item:
item['tags_json'] = json.dumps(item['tags'])
+
+ # replace sec prof rules with their json dump
+ def jsonify_rules(rule_key):
+ if rule_key in item:
+ rules_json = json.dumps(item[rule_key])
+ item['%s_json' % rule_key] = rules_json
+ jsonify_rules('logical_port_egress_rules')
+ jsonify_rules('logical_port_ingress_rules')
+
items = [json.loads(response_template % res_dict[res_uuid])
for res_uuid in res_dict if res_uuid == target_uuid]
if items:
elif ('lswitch' in res_type or
'lrouter' in res_type or
self.SECPROF_RESOURCE in res_type or
+ self.LQUEUE_RESOURCE in res_type or
'gatewayservice' in res_type):
LOG.debug("UUIDS:%s", uuids)
if uuids:
val_func = self._validators.get(res_type)
if val_func:
val_func(body_json)
- resource = res_dict[uuids[-1]]
+ try:
+ resource = res_dict[uuids[-1]]
+ except KeyError:
+ raise NvpApiClient.ResourceNotFound()
if not is_attachment:
+ edit_resource = getattr(self, '_build_%s' % res_type, None)
+ if edit_resource:
+ body_json = edit_resource(body)
resource.update(body_json)
else:
relations = resource.get("_relations", {})
for k, v in keys:
self.assertEqual(gw[self.resource][k], v)
+ def test_create_network_gateway_no_interface_name(self):
+ name = 'test-gw'
+ devices = [{'id': _uuid()}]
+ exp_devices = devices
+ exp_devices[0]['interface_name'] = 'breth0'
+ keys = [('devices', exp_devices), ('name', name)]
+ with self._network_gateway(name=name, devices=devices) as gw:
+ for k, v in keys:
+ self.assertEqual(gw[self.resource][k], v)
+
def _test_delete_network_gateway(self, exp_gw_count=0):
name = 'test-gw'
devices = [{'id': _uuid(), 'interface_name': 'xxx'},
from quantum.extensions import providernet as pnet
from quantum.extensions import securitygroup as secgrp
from quantum import manager
+from quantum.openstack.common import uuidutils
import quantum.plugins.nicira as nvp_plugin
from quantum.plugins.nicira.extensions import nvp_networkgw
from quantum.plugins.nicira.extensions import nvp_qos as ext_qos
+from quantum.plugins.nicira import NvpApiClient
from quantum.plugins.nicira import nvplib
from quantum.plugins.nicira import QuantumPlugin
from quantum.tests.unit.nicira import fake_nvpapiclient
def test_router_create_with_gwinfo_and_l3_ext_net_with_vlan(self):
self._test_router_create_with_gwinfo_and_l3_ext_net(444)
+ def test_router_create_nvp_error_returns_500(self, vlan_id=None):
+ with mock.patch.object(nvplib,
+ 'create_router_lport',
+ side_effect=NvpApiClient.NvpApiException):
+ with self._create_l3_ext_network(vlan_id) as net:
+ with self.subnet(network=net) as s:
+ data = {'router': {'tenant_id': 'whatever'}}
+ data['router']['name'] = 'router1'
+ data['router']['external_gateway_info'] = {
+ 'network_id': s['subnet']['network_id']}
+ router_req = self.new_create_request(
+ 'routers', data, self.fmt)
+ res = router_req.get_response(self.ext_api)
+ self.assertEqual(500, res.status_int)
+
def _test_router_update_gateway_on_l3_ext_net(self, vlan_id=None):
with self.router() as r:
with self.subnet() as s1:
def test_router_update_gateway_on_l3_ext_net_with_vlan(self):
self._test_router_update_gateway_on_l3_ext_net(444)
+ def test_router_list_by_tenant_id(self):
+ with contextlib.nested(self.router(tenant_id='custom'),
+ self.router(),
+ self.router()
+ ) as routers:
+ self._test_list_resources('router', [routers[0]],
+ query_params="tenant_id=custom")
+
def test_create_l3_ext_network_with_vlan(self):
self._test_create_l3_ext_network(666)
# Test that route is deleted after dhcp port is removed
self.assertEquals(len(subnets[0]['host_routes']), 0)
+ def test_floatingip_disassociate(self):
+ with self.port() as p:
+ private_sub = {'subnet': {'id':
+ p['port']['fixed_ips'][0]['subnet_id']}}
+ with self.floatingip_no_assoc(private_sub) as fip:
+ port_id = p['port']['id']
+ body = self._update('floatingips', fip['floatingip']['id'],
+ {'floatingip': {'port_id': port_id}})
+ self.assertEqual(body['floatingip']['port_id'], port_id)
+ # Disassociate
+ body = self._update('floatingips', fip['floatingip']['id'],
+ {'floatingip': {'port_id': None}})
+ body = self._show('floatingips', fip['floatingip']['id'])
+ self.assertIsNone(body['floatingip']['port_id'])
+ self.assertIsNone(body['floatingip']['fixed_ip_address'])
+
class NvpQoSTestExtensionManager(object):
res = self._create_port('json', net1['network']['id'])
port = self.deserialize('json', res)
self.fc._fake_lswitch_lport_dict.clear()
+ self.fc._fake_lswitch_lportstatus_dict.clear()
req = self.new_show_request('ports', port['port']['id'])
net = self.deserialize('json', req.get_response(self.api))
self.assertEqual(net['port']['status'],
# Assert Quantum name is not truncated
self.assertEqual(nw_gw[self.resource]['name'], name)
+ def test_create_network_gateway_nvp_error_returns_500(self):
+ def raise_nvp_api_exc(*args, **kwargs):
+ raise NvpApiClient.NvpApiException
+
+ with mock.patch.object(nvplib,
+ 'create_l2_gw_service',
+ new=raise_nvp_api_exc):
+ res = self._create_network_gateway(
+ self.fmt, 'xxx', name='yyy',
+ devices=[{'id': uuidutils.generate_uuid()}])
+ self.assertEqual(500, res.status_int)
+
def test_list_network_gateways(self):
with self._network_gateway(name='test-gw-1') as gw1:
with self._network_gateway(name='test_gw_2') as gw2:
import mock
import os
-from quantum.openstack.common import jsonutils as json
+from quantum.common import constants
+from quantum.common import exceptions
import quantum.plugins.nicira as nvp_plugin
+from quantum.plugins.nicira.common import config # noqa
+from quantum.plugins.nicira.common import exceptions as nvp_exc
from quantum.plugins.nicira import nvp_cluster
from quantum.plugins.nicira import NvpApiClient
from quantum.plugins.nicira import nvplib
self.addCleanup(self.fc.reset_all)
self.addCleanup(self.mock_nvpapi.stop)
+ def _build_tag_dict(self, tags):
+ # This syntax is needed for python 2.6 compatibility
+ return dict((t['scope'], t['tag']) for t in tags)
+
class TestNvplibNatRules(NvplibTestCase):
uri = nvplib._build_uri_path(nvplib.LROUTERNAT_RESOURCE,
nat_rule['uuid'],
lrouter['uuid'])
- return json.loads(nvplib.do_single_request("GET", uri,
- cluster=self.fake_cluster))
+ return nvplib.do_request("GET", uri, cluster=self.fake_cluster)
def test_create_lrouter_dnat_rule_v2(self):
resp_obj = self._test_create_lrouter_dnat_rule(
'new_hop')
-class NvplibL2GatewayTestCase(NvplibTestCase):
+class TestNvplibL2Gateway(NvplibTestCase):
- def _create_gw_service(self, node_uuid, display_name):
+ def _create_gw_service(self, node_uuid, display_name,
+ tenant_id='fake_tenant'):
return nvplib.create_l2_gw_service(self.fake_cluster,
- 'fake-tenant',
+ tenant_id,
display_name,
[{'id': node_uuid,
'interface_name': 'xxx'}])
self.assertEqual(len(results), 2)
self.assertEqual(sorted(gw_ids), sorted([r['uuid'] for r in results]))
+ def test_list_l2_gw_service_by_tenant(self):
+ gw_ids = [self._create_gw_service(
+ _uuid(), name, tenant_id=name)['uuid']
+ for name in ('fake-1', 'fake-2')]
+ results = nvplib.get_l2_gw_services(self.fake_cluster,
+ tenant_id='fake-1')
+ self.assertEqual(len(results), 1)
+ self.assertEqual(results[0]['uuid'], gw_ids[0])
+
def test_delete_l2_gw_service(self):
display_name = 'fake-gateway'
node_uuid = _uuid()
'fake-gw-port',
gw_id,
True)
- json.loads(nvplib.plug_l2_gw_service(self.fake_cluster,
- lswitch['uuid'],
- lport['uuid'],
- gw_id))
+ nvplib.plug_l2_gw_service(self.fake_cluster,
+ lswitch['uuid'],
+ lport['uuid'],
+ gw_id)
uri = nvplib._build_uri_path(nvplib.LSWITCHPORT_RESOURCE,
lport['uuid'],
lswitch['uuid'],
is_attachment=True)
- resp_obj = json.loads(
- nvplib.do_single_request("GET", uri,
- cluster=self.fake_cluster))
+ resp_obj = nvplib.do_request("GET", uri,
+ cluster=self.fake_cluster)
self.assertIn('LogicalPortAttachment', resp_obj)
self.assertEqual(resp_obj['LogicalPortAttachment']['type'],
'L2GatewayAttachment')
-class TestNvpLibLogicalPorts(NvplibTestCase):
+class TestNvplibLogicalSwitches(NvplibTestCase):
- def test_get_port_by_tag(self):
+ def test_create_and_get_lswitches_single(self):
tenant_id = 'pippo'
- quantum_port_id = 'whatever'
- lswitch = nvplib.create_lswitch(self.fake_cluster, tenant_id,
+ lswitch = nvplib.create_lswitch(self.fake_cluster,
+ tenant_id,
+ 'fake-switch')
+ res_lswitch = nvplib.get_lswitches(self.fake_cluster,
+ lswitch['uuid'])
+ self.assertEqual(len(res_lswitch), 1)
+ self.assertEqual(res_lswitch[0]['uuid'],
+ lswitch['uuid'])
+
+ def test_create_and_get_lswitches_single_name_exceeds_40_chars(self):
+ tenant_id = 'pippo'
+ lswitch = nvplib.create_lswitch(self.fake_cluster,
+ tenant_id,
+ '*' * 50)
+ res_lswitch = nvplib.get_lswitches(self.fake_cluster,
+ lswitch['uuid'])
+ self.assertEqual(len(res_lswitch), 1)
+ self.assertEqual(res_lswitch[0]['uuid'], lswitch['uuid'])
+ self.assertEqual(res_lswitch[0]['display_name'], '*' * 40)
+
+ def test_create_and_get_lswitches_multiple(self):
+ tenant_id = 'pippo'
+ main_lswitch = nvplib.create_lswitch(
+ self.fake_cluster, tenant_id, 'fake-switch',
+ tags=[{'scope': 'multi_lswitch', 'tag': 'True'}])
+ # Create secondary lswitch
+ nvplib.create_lswitch(
+ self.fake_cluster, tenant_id, 'fake-switch-2',
+ quantum_net_id=main_lswitch['uuid'])
+ res_lswitch = nvplib.get_lswitches(self.fake_cluster,
+ main_lswitch['uuid'])
+ self.assertEqual(len(res_lswitch), 2)
+ self.assertEqual(res_lswitch[0]['uuid'],
+ main_lswitch['uuid'])
+ switch_1_tags = self._build_tag_dict(res_lswitch[0]['tags'])
+ switch_2_tags = self._build_tag_dict(res_lswitch[1]['tags'])
+ self.assertIn('multi_lswitch', switch_1_tags)
+ self.assertNotIn('multi_lswitch', switch_2_tags)
+ self.assertNotIn('quantum_net_id', switch_1_tags)
+ self.assertIn('quantum_net_id', switch_2_tags)
+ self.assertEqual(switch_2_tags['quantum_net_id'],
+ main_lswitch['uuid'])
+
+ def test_update_lswitch(self):
+ new_name = 'new-name'
+ new_tags = [{'scope': 'new_tag', 'tag': 'xxx'}]
+ lswitch = nvplib.create_lswitch(self.fake_cluster,
+ 'pippo',
'fake-switch')
+ nvplib.update_lswitch(self.fake_cluster, lswitch['uuid'],
+ new_name, tags=new_tags)
+ res_lswitch = nvplib.get_lswitches(self.fake_cluster,
+ lswitch['uuid'])
+ self.assertEqual(len(res_lswitch), 1)
+ self.assertEqual(res_lswitch[0]['display_name'], new_name)
+ switch_tags = self._build_tag_dict(res_lswitch[0]['tags'])
+ self.assertIn('new_tag', switch_tags)
+ self.assertEqual(switch_tags['new_tag'], 'xxx')
+
+ def test_update_non_existing_lswitch_raises(self):
+ self.assertRaises(exceptions.NetworkNotFound,
+ nvplib.update_lswitch,
+ self.fake_cluster, 'whatever',
+ 'foo', 'bar')
+
+ def test_delete_networks(self):
+ lswitch = nvplib.create_lswitch(self.fake_cluster,
+ 'pippo',
+ 'fake-switch')
+ nvplib.delete_networks(self.fake_cluster, lswitch['uuid'],
+ [lswitch['uuid']])
+ self.assertRaises(exceptions.NotFound,
+ nvplib.get_lswitches,
+ self.fake_cluster,
+ lswitch['uuid'])
+
+ def test_delete_non_existing_lswitch_raises(self):
+ self.assertRaises(exceptions.NetworkNotFound,
+ nvplib.delete_networks,
+ self.fake_cluster, 'whatever', ['whatever'])
+
+
+class TestNvplibLogicalRouters(NvplibTestCase):
+
+ def _verify_lrouter(self, res_lrouter,
+ expected_uuid,
+ expected_display_name,
+ expected_nexthop,
+ expected_tenant_id):
+ self.assertEqual(res_lrouter['uuid'], expected_uuid)
+ nexthop = (res_lrouter['routing_config']
+ ['default_route_next_hop']['gateway_ip_address'])
+ self.assertEqual(nexthop, expected_nexthop)
+ router_tags = self._build_tag_dict(res_lrouter['tags'])
+ self.assertIn('os_tid', router_tags)
+ self.assertEqual(res_lrouter['display_name'], expected_display_name)
+ self.assertEqual(expected_tenant_id, router_tags['os_tid'])
+
+ def test_get_lrouters(self):
+ lrouter_uuids = [nvplib.create_lrouter(
+ self.fake_cluster, 'pippo', 'fake-lrouter-%s' % k,
+ '10.0.0.1')['uuid'] for k in range(0, 3)]
+ routers = nvplib.get_lrouters(self.fake_cluster, 'pippo')
+ for router in routers:
+ self.assertIn(router['uuid'], lrouter_uuids)
+
+ def test_create_and_get_lrouter(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ res_lrouter = nvplib.get_lrouter(self.fake_cluster,
+ lrouter['uuid'])
+ self._verify_lrouter(res_lrouter, lrouter['uuid'],
+ 'fake-lrouter', '10.0.0.1', 'pippo')
+
+ def test_create_and_get_lrouter_name_exceeds_40chars(self):
+ display_name = '*' * 50
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ display_name,
+ '10.0.0.1')
+ res_lrouter = nvplib.get_lrouter(self.fake_cluster,
+ lrouter['uuid'])
+ self._verify_lrouter(res_lrouter, lrouter['uuid'],
+ '*' * 40, '10.0.0.1', 'pippo')
+
+ def test_update_lrouter_no_nexthop(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter = nvplib.update_lrouter(self.fake_cluster,
+ lrouter['uuid'],
+ 'new_name',
+ None)
+ res_lrouter = nvplib.get_lrouter(self.fake_cluster,
+ lrouter['uuid'])
+ self._verify_lrouter(res_lrouter, lrouter['uuid'],
+ 'new_name', '10.0.0.1', 'pippo')
+
+ def test_update_lrouter(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter = nvplib.update_lrouter(self.fake_cluster,
+ lrouter['uuid'],
+ 'new_name',
+ '192.168.0.1')
+ res_lrouter = nvplib.get_lrouter(self.fake_cluster,
+ lrouter['uuid'])
+ self._verify_lrouter(res_lrouter, lrouter['uuid'],
+ 'new_name', '192.168.0.1', 'pippo')
+
+ def test_update_nonexistent_lrouter_raises(self):
+ self.assertRaises(exceptions.NotFound,
+ nvplib.update_lrouter,
+ self.fake_cluster, 'whatever',
+ 'foo', '9.9.9.9')
+
+ def test_delete_lrouter(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ nvplib.delete_lrouter(self.fake_cluster, lrouter['uuid'])
+ self.assertRaises(exceptions.NotFound,
+ nvplib.get_lrouter,
+ self.fake_cluster,
+ lrouter['uuid'])
+
+ def test_query_lrouter_ports(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ router_port_uuids = [nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo',
+ 'qp_id_%s' % k, 'port-%s' % k, True,
+ ['192.168.0.%s' % k])['uuid'] for k in range(0, 3)]
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(ports), 3)
+ for res_port in ports:
+ self.assertIn(res_port['uuid'], router_port_uuids)
+
+ def test_query_lrouter_lports_nonexistent_lrouter_raises(self):
+ self.assertRaises(
+ exceptions.NotFound, nvplib.create_router_lport,
+ self.fake_cluster, 'booo', 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+
+ def test_create_and_get_lrouter_port(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(ports), 1)
+ res_port = ports[0]
+ port_tags = self._build_tag_dict(res_port['tags'])
+ self.assertEqual(['192.168.0.1'], res_port['ip_addresses'])
+ self.assertIn('os_tid', port_tags)
+ self.assertIn('q_port_id', port_tags)
+ self.assertEqual('pippo', port_tags['os_tid'])
+ self.assertEqual('quantum_port_id', port_tags['q_port_id'])
+
+ def test_create_lrouter_port_nonexistent_router_raises(self):
+ self.assertRaises(
+ exceptions.NotFound, nvplib.create_router_lport,
+ self.fake_cluster, 'booo', 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+
+ def test_update_lrouter_port(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ nvplib.update_router_lport(
+ self.fake_cluster, lrouter['uuid'], lrouter_port['uuid'],
+ 'pippo', 'another_port_id', 'name', False,
+ ['192.168.0.1', '10.10.10.254'])
+
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(ports), 1)
+ res_port = ports[0]
+ port_tags = self._build_tag_dict(res_port['tags'])
+ self.assertEqual(['192.168.0.1', '10.10.10.254'],
+ res_port['ip_addresses'])
+ self.assertEqual('False', res_port['admin_status_enabled'])
+ self.assertIn('os_tid', port_tags)
+ self.assertIn('q_port_id', port_tags)
+ self.assertEqual('pippo', port_tags['os_tid'])
+ self.assertEqual('another_port_id', port_tags['q_port_id'])
+
+ def test_update_lrouter_port_nonexistent_router_raises(self):
+ self.assertRaises(
+ exceptions.NotFound, nvplib.update_router_lport,
+ self.fake_cluster, 'boo-router', 'boo-port', 'pippo',
+ 'quantum_port_id', 'name', True, ['192.168.0.1'])
+
+ def test_update_lrouter_port_nonexistent_port_raises(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ self.assertRaises(
+ exceptions.NotFound, nvplib.update_router_lport,
+ self.fake_cluster, lrouter['uuid'], 'boo-port', 'pippo',
+ 'quantum_port_id', 'name', True, ['192.168.0.1'])
+
+ def test_delete_lrouter_port(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'x', 'y', True, [])
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(ports), 1)
+ nvplib.delete_router_lport(self.fake_cluster, lrouter['uuid'],
+ lrouter_port['uuid'])
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertFalse(len(ports))
+
+ def test_delete_lrouter_port_nonexistent_router_raises(self):
+ self.assertRaises(exceptions.NotFound,
+ nvplib.delete_router_lport,
+ self.fake_cluster, 'xyz', 'abc')
+
+ def test_delete_lrouter_port_nonexistent_port_raises(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ self.assertRaises(exceptions.NotFound,
+ nvplib.delete_router_lport,
+ self.fake_cluster, lrouter['uuid'], 'abc')
+
+ def test_delete_peer_lrouter_port(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'x', 'y', True, [])
+
+ def fakegetport(*args, **kwargs):
+ return {'_relations': {'LogicalPortAttachment':
+ {'peer_port_uuid': lrouter_port['uuid']}}}
+ # mock get_port
+ with mock.patch.object(nvplib, 'get_port', new=fakegetport):
+ nvplib.delete_peer_router_lport(self.fake_cluster,
+ lrouter_port['uuid'],
+ 'whatwever', 'whatever')
+
+ def test_update_lrouter_port_ips_add_only(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ nvplib.update_lrouter_port_ips(
+ self.fake_cluster, lrouter['uuid'], lrouter_port['uuid'],
+ ['10.10.10.254'], [])
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(ports), 1)
+ res_port = ports[0]
+ self.assertEqual(['10.10.10.254', '192.168.0.1'],
+ res_port['ip_addresses'])
+
+ def test_update_lrouter_port_ips_remove_only(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1', '10.10.10.254'])
+ nvplib.update_lrouter_port_ips(
+ self.fake_cluster, lrouter['uuid'], lrouter_port['uuid'],
+ [], ['10.10.10.254'])
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(ports), 1)
+ res_port = ports[0]
+ self.assertEqual(['192.168.0.1'], res_port['ip_addresses'])
+
+ def test_update_lrouter_port_ips_add_and_remove(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ nvplib.update_lrouter_port_ips(
+ self.fake_cluster, lrouter['uuid'], lrouter_port['uuid'],
+ ['10.10.10.254'], ['192.168.0.1'])
+ ports = nvplib.query_lrouter_lports(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(ports), 1)
+ res_port = ports[0]
+ self.assertEqual(['10.10.10.254'], res_port['ip_addresses'])
+
+ def test_update_lrouter_port_ips_nonexistent_router_raises(self):
+ self.assertRaises(
+ nvp_exc.NvpPluginException, nvplib.update_lrouter_port_ips,
+ self.fake_cluster, 'boo-router', 'boo-port', [], [])
+
+ def test_update_lrouter_port_ips_nvp_exception_raises(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+
+ def raise_nvp_exc(*args, **kwargs):
+ raise NvpApiClient.NvpApiException()
+
+ with mock.patch.object(nvplib, 'do_request', new=raise_nvp_exc):
+ self.assertRaises(
+ nvp_exc.NvpPluginException, nvplib.update_lrouter_port_ips,
+ self.fake_cluster, lrouter['uuid'],
+ lrouter_port['uuid'], [], [])
+
+ def test_plug_lrouter_port_patch_attachment(self):
+ tenant_id = 'pippo'
+ lswitch = nvplib.create_lswitch(self.fake_cluster,
+ tenant_id, 'fake-switch')
+ lport = nvplib.create_lport(self.fake_cluster, lswitch['uuid'],
+ tenant_id, 'xyz',
+ 'name', 'device_id', True)
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ tenant_id,
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ result = nvplib.plug_router_port_attachment(
+ self.fake_cluster, lrouter['uuid'],
+ lrouter_port['uuid'],
+ lport['uuid'], 'PatchAttachment')
+ self.assertEqual(lport['uuid'],
+ result['LogicalPortAttachment']['peer_port_uuid'])
+
+ def test_plug_lrouter_port_l3_gw_attachment(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ result = nvplib.plug_router_port_attachment(
+ self.fake_cluster, lrouter['uuid'],
+ lrouter_port['uuid'],
+ 'gw_att', 'L3GatewayAttachment')
+ self.assertEqual(
+ 'gw_att',
+ result['LogicalPortAttachment']['l3_gateway_service_uuid'])
+
+ def test_plug_lrouter_port_l3_gw_attachment_with_vlan(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ result = nvplib.plug_router_port_attachment(
+ self.fake_cluster, lrouter['uuid'],
+ lrouter_port['uuid'],
+ 'gw_att', 'L3GatewayAttachment', 123)
+ self.assertEqual(
+ 'gw_att',
+ result['LogicalPortAttachment']['l3_gateway_service_uuid'])
+ self.assertEqual(
+ '123',
+ result['LogicalPortAttachment']['vlan_id'])
+
+ def test_plug_lrouter_port_invalid_attachment_type_raises(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ lrouter_port = nvplib.create_router_lport(
+ self.fake_cluster, lrouter['uuid'], 'pippo', 'quantum_port_id',
+ 'name', True, ['192.168.0.1'])
+ self.assertRaises(Exception,
+ nvplib.plug_router_port_attachment,
+ self.fake_cluster, lrouter['uuid'],
+ lrouter_port['uuid'], 'gw_att', 'BadType')
+
+ def _test_create_router_snat_rule(self, version):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ with mock.patch.object(self.fake_cluster.api_client,
+ 'get_nvp_version',
+ new=lambda: version):
+ nvplib.create_lrouter_snat_rule(
+ self.fake_cluster, lrouter['uuid'],
+ '10.0.0.2', '10.0.0.2', order=200,
+ match_criteria={'source_ip_addresses': '192.168.0.24'})
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 1)
+
+ def test_create_router_snat_rule_v3(self):
+ self._test_create_router_snat_rule('3.0')
+
+ def test_create_router_snat_rule_v2(self):
+ self._test_create_router_snat_rule('2.0')
+
+ def _test_create_router_dnat_rule(self, version, dest_port=None):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ with mock.patch.object(self.fake_cluster.api_client,
+ 'get_nvp_version',
+ return_value=version):
+ nvplib.create_lrouter_dnat_rule(
+ self.fake_cluster, lrouter['uuid'], '192.168.0.2', order=200,
+ dest_port=dest_port,
+ match_criteria={'destination_ip_addresses': '10.0.0.3'})
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 1)
+
+ def test_create_router_dnat_rule_v3(self):
+ self._test_create_router_dnat_rule('3.0')
+
+ def test_create_router_dnat_rule_v2(self):
+ self._test_create_router_dnat_rule('2.0')
+
+ def test_create_router_dnat_rule_v2_with_destination_port(self):
+ self._test_create_router_dnat_rule('2.0', 8080)
+
+ def test_create_router_dnat_rule_v3_with_destination_port(self):
+ self._test_create_router_dnat_rule('3.0', 8080)
+
+ def test_create_router_snat_rule_invalid_match_keys_raises(self):
+ # In this case the version does not make a difference
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+
+ with mock.patch.object(self.fake_cluster.api_client,
+ 'get_nvp_version',
+ new=lambda: '2.0'):
+ self.assertRaises(Exception,
+ nvplib.create_lrouter_snat_rule,
+ self.fake_cluster, lrouter['uuid'],
+ '10.0.0.2', '10.0.0.2', order=200,
+ match_criteria={'foo': 'bar'})
+
+ def _test_create_router_nosnat_rule(self, version, expected=1):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ with mock.patch.object(self.fake_cluster.api_client,
+ 'get_nvp_version',
+ new=lambda: version):
+ nvplib.create_lrouter_nosnat_rule(
+ self.fake_cluster, lrouter['uuid'],
+ order=100,
+ match_criteria={'destination_ip_addresses': '192.168.0.0/24'})
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ # NoSNAT rules do not exist in V2
+ self.assertEqual(len(rules), expected)
+
+ def test_create_router_nosnat_rule_v2(self):
+ self._test_create_router_nosnat_rule('2.0', expected=0)
+
+ def test_create_router_nosnat_rule_v3(self):
+ self._test_create_router_nosnat_rule('3.0')
+
+ def _prepare_nat_rules_for_delete_tests(self):
+ lrouter = nvplib.create_lrouter(self.fake_cluster,
+ 'pippo',
+ 'fake-lrouter',
+ '10.0.0.1')
+ # v2 or v3 makes no difference for this test
+ with mock.patch.object(self.fake_cluster.api_client,
+ 'get_nvp_version',
+ new=lambda: '2.0'):
+ nvplib.create_lrouter_snat_rule(
+ self.fake_cluster, lrouter['uuid'],
+ '10.0.0.2', '10.0.0.2', order=220,
+ match_criteria={'source_ip_addresses': '192.168.0.0/24'})
+ nvplib.create_lrouter_snat_rule(
+ self.fake_cluster, lrouter['uuid'],
+ '10.0.0.3', '10.0.0.3', order=200,
+ match_criteria={'source_ip_addresses': '192.168.0.2/32'})
+ nvplib.create_lrouter_dnat_rule(
+ self.fake_cluster, lrouter['uuid'], '192.168.0.2', order=200,
+ match_criteria={'destination_ip_addresses': '10.0.0.3'})
+ return lrouter
+
+ def test_delete_router_nat_rules_by_match_on_destination_ip(self):
+ lrouter = self._prepare_nat_rules_for_delete_tests()
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 3)
+ nvplib.delete_nat_rules_by_match(
+ self.fake_cluster, lrouter['uuid'], 'DestinationNatRule', 1, 1,
+ destination_ip_addresses='10.0.0.3')
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 2)
+
+ def test_delete_router_nat_rules_by_match_on_source_ip(self):
+ lrouter = self._prepare_nat_rules_for_delete_tests()
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 3)
+ nvplib.delete_nat_rules_by_match(
+ self.fake_cluster, lrouter['uuid'], 'SourceNatRule', 1, 1,
+ source_ip_addresses='192.168.0.2/32')
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 2)
+
+ def test_delete_router_nat_rules_by_match_no_match_expected(self):
+ lrouter = self._prepare_nat_rules_for_delete_tests()
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 3)
+ nvplib.delete_nat_rules_by_match(
+ self.fake_cluster, lrouter['uuid'], 'SomeWeirdType', 0)
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 3)
+ nvplib.delete_nat_rules_by_match(
+ self.fake_cluster, lrouter['uuid'], 'DestinationNatRule', 0,
+ destination_ip_addresses='99.99.99.99')
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 3)
+
+ def test_delete_router_nat_rules_by_match_no_match_raises(self):
+ lrouter = self._prepare_nat_rules_for_delete_tests()
+ rules = nvplib.query_nat_rules(self.fake_cluster, lrouter['uuid'])
+ self.assertEqual(len(rules), 3)
+ self.assertRaises(
+ nvp_exc.NvpNatRuleMismatch,
+ nvplib.delete_nat_rules_by_match,
+ self.fake_cluster, lrouter['uuid'],
+ 'SomeWeirdType', 1, 1)
+
+
+class TestNvplibSecurityProfile(NvplibTestCase):
+
+ def test_create_and_get_security_profile(self):
+ sec_prof = nvplib.create_security_profile(self.fake_cluster,
+ 'pippo', {'name': 'test'})
+ sec_prof_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1)
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
+
+ def test_create_and_get_default_security_profile(self):
+ sec_prof = nvplib.create_security_profile(self.fake_cluster,
+ 'pippo',
+ {'name': 'default'})
+ sec_prof_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3)
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
+
+ def test_update_security_profile_rules(self):
+ sec_prof = nvplib.create_security_profile(self.fake_cluster,
+ 'pippo', {'name': 'test'})
+ ingress_rule = {'ethertype': 'IPv4'}
+ egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
+ new_rules = {'logical_port_egress_rules': [egress_rule],
+ 'logical_port_ingress_rules': [ingress_rule]}
+ nvplib.update_security_group_rules(self.fake_cluster,
+ sec_prof['uuid'],
+ new_rules)
+ sec_prof_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
+ self.assertIn(egress_rule,
+ sec_prof_res['logical_port_egress_rules'])
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
+ self.assertIn(ingress_rule,
+ sec_prof_res['logical_port_ingress_rules'])
+
+ def test_update_security_profile_rules_noingress(self):
+ sec_prof = nvplib.create_security_profile(self.fake_cluster,
+ 'pippo', {'name': 'test'})
+ hidden_ingress_rule = {'ethertype': 'IPv4',
+ 'ip_prefix': '127.0.0.1/32'}
+ egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
+ new_rules = {'logical_port_egress_rules': [egress_rule],
+ 'logical_port_ingress_rules': []}
+ nvplib.update_security_group_rules(self.fake_cluster,
+ sec_prof['uuid'],
+ new_rules)
+ sec_prof_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+ self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
+ # Check for builtin rules
+ self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
+ self.assertIn(egress_rule,
+ sec_prof_res['logical_port_egress_rules'])
+ self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
+ self.assertIn(hidden_ingress_rule,
+ sec_prof_res['logical_port_ingress_rules'])
+
+ def test_update_non_existing_securityprofile_raises(self):
+ self.assertRaises(exceptions.QuantumException,
+ nvplib.update_security_group_rules,
+ self.fake_cluster, 'whatever',
+ {'logical_port_egress_rules': [],
+ 'logical_port_ingress_rules': []})
+
+ def test_delete_security_profile(self):
+ sec_prof = nvplib.create_security_profile(self.fake_cluster,
+ 'pippo', {'name': 'test'})
+ nvplib.delete_security_profile(self.fake_cluster, sec_prof['uuid'])
+ self.assertRaises(exceptions.NotFound,
+ nvplib.do_request,
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path(
+ 'security-profile',
+ resource_id=sec_prof['uuid']),
+ cluster=self.fake_cluster)
+
+ def test_delete_non_existing_securityprofile_raises(self):
+ self.assertRaises(exceptions.QuantumException,
+ nvplib.delete_security_profile,
+ self.fake_cluster, 'whatever')
+
+
+class TestNvplibLQueue(NvplibTestCase):
+
+ def test_create_and_get_lqueue(self):
+ queue_id = nvplib.create_lqueue(self.fake_cluster,
+ {'display_name': 'fake_queue',
+ 'min_bandwidth_rate': 0,
+ 'max_bandwidth_rate': 256,
+ 'dscp': 0,
+ 'qos_marking': False})
+ queue_res = nvplib.do_request(
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path('lqueue', resource_id=queue_id),
+ cluster=self.fake_cluster)
+ self.assertEqual(queue_id, queue_res['uuid'])
+ self.assertEqual('fake_queue', queue_res['display_name'])
+
+ def test_create_lqueue_nvp_error_raises(self):
+ def raise_nvp_exc(*args, **kwargs):
+ raise NvpApiClient.NvpApiException()
+
+ with mock.patch.object(nvplib, 'do_request', new=raise_nvp_exc):
+ self.assertRaises(
+ exceptions.QuantumException, nvplib.create_lqueue,
+ self.fake_cluster, {})
+
+ def test_delete_lqueue(self):
+ queue_id = nvplib.create_lqueue(self.fake_cluster,
+ {'display_name': 'fake_queue',
+ 'min_bandwidth_rate': 0,
+ 'max_bandwidth_rate': 256,
+ 'dscp': 0,
+ 'qos_marking': False})
+ nvplib.delete_lqueue(self.fake_cluster, queue_id)
+ self.assertRaises(exceptions.NotFound,
+ nvplib.do_request,
+ nvplib.HTTP_GET,
+ nvplib._build_uri_path(
+ 'lqueue', resource_id=queue_id),
+ cluster=self.fake_cluster)
+
+ def test_delete_non_existing_lqueue_raises(self):
+ self.assertRaises(exceptions.QuantumException,
+ nvplib.delete_lqueue,
+ self.fake_cluster, 'whatever')
+
+
+class TestNvplibLogicalPorts(NvplibTestCase):
+
+ def _create_switch_and_port(self, tenant_id='pippo',
+ quantum_port_id='whatever'):
+ lswitch = nvplib.create_lswitch(self.fake_cluster,
+ tenant_id, 'fake-switch')
lport = nvplib.create_lport(self.fake_cluster, lswitch['uuid'],
tenant_id, quantum_port_id,
'name', 'device_id', True)
+ return lswitch, lport
+
+ def test_create_and_get_port(self):
+ lswitch, lport = self._create_switch_and_port()
+ lport_res = nvplib.get_port(self.fake_cluster,
+ lswitch['uuid'], lport['uuid'])
+ self.assertEqual(lport['uuid'], lport_res['uuid'])
+ # Try again with relation
+ lport_res = nvplib.get_port(self.fake_cluster,
+ lswitch['uuid'], lport['uuid'],
+ relations='LogicalPortStatus')
+ self.assertEqual(lport['uuid'], lport_res['uuid'])
+
+ def test_plug_interface(self):
+ lswitch, lport = self._create_switch_and_port()
+ nvplib.plug_interface(self.fake_cluster, lswitch['uuid'],
+ lport['uuid'], 'VifAttachment', 'fake')
+ lport_res = nvplib.get_port(self.fake_cluster,
+ lswitch['uuid'], lport['uuid'])
+ self.assertEqual(lport['uuid'], lport_res['uuid'])
+
+ def test_get_port_by_tag(self):
+ lswitch, lport = self._create_switch_and_port()
lport2 = nvplib.get_port_by_quantum_tag(self.fake_cluster,
lswitch['uuid'],
- quantum_port_id)
+ 'whatever')
self.assertIsNotNone(lport2)
self.assertEqual(lport['uuid'], lport2['uuid'])
lswitch['uuid'],
quantum_port_id)
self.assertIsNone(lport)
+
+ def test_get_port_status(self):
+ lswitch, lport = self._create_switch_and_port()
+ status = nvplib.get_port_status(self.fake_cluster,
+ lswitch['uuid'],
+ lport['uuid'])
+ self.assertEqual(constants.PORT_STATUS_ACTIVE, status)
+
+ def test_get_port_status_non_existent_raises(self):
+ self.assertRaises(exceptions.PortNotFound,
+ nvplib.get_port_status,
+ self.fake_cluster,
+ 'boo', 'boo')
+
+ def test_update_port(self):
+ lswitch, lport = self._create_switch_and_port()
+ nvplib.update_port(
+ self.fake_cluster, lswitch['uuid'], lport['uuid'],
+ 'quantum_port_id', 'pippo2', 'new_name', 'device_id', False)
+ lport_res = nvplib.get_port(self.fake_cluster,
+ lswitch['uuid'], lport['uuid'])
+ self.assertEqual(lport['uuid'], lport_res['uuid'])
+ self.assertEqual('new_name', lport_res['display_name'])
+ self.assertEqual('False', lport_res['admin_status_enabled'])
+ port_tags = self._build_tag_dict(lport_res['tags'])
+ self.assertIn('os_tid', port_tags)
+ self.assertIn('q_port_id', port_tags)
+ self.assertIn('vm_id', port_tags)
+
+ def test_update_non_existent_port_raises(self):
+ self.assertRaises(exceptions.PortNotFound,
+ nvplib.update_port, self.fake_cluster,
+ 'boo', 'boo', 'boo', 'boo', 'boo', 'boo', False)
+
+ def test_delete_port(self):
+ lswitch, lport = self._create_switch_and_port()
+ nvplib.delete_port(self.fake_cluster,
+ lswitch['uuid'], lport['uuid'])
+ self.assertRaises(exceptions.PortNotFound,
+ nvplib.get_port, self.fake_cluster,
+ lswitch['uuid'], lport['uuid'])
+
+ def test_delete_non_existent_port_raises(self):
+ lswitch = self._create_switch_and_port()[0]
+ self.assertRaises(exceptions.PortNotFound,
+ nvplib.delete_port, self.fake_cluster,
+ lswitch['uuid'], 'bad_port_uuid')
+
+ def test_query_lswitch_ports(self):
+ lswitch, lport = self._create_switch_and_port()
+ switch_port_uuids = [
+ nvplib.create_lport(
+ self.fake_cluster, lswitch['uuid'], 'pippo', 'qportid-%s' % k,
+ 'port-%s' % k, 'deviceid-%s' % k, True)['uuid']
+ for k in range(0, 2)]
+ switch_port_uuids.append(lport['uuid'])
+ ports = nvplib.query_lswitch_lports(self.fake_cluster, lswitch['uuid'])
+ self.assertEqual(len(ports), 3)
+ for res_port in ports:
+ self.assertIn(res_port['uuid'], switch_port_uuids)
+
+
+class TestNvplibClusterVersion(NvplibTestCase):
+
+ def test_get_cluster_version(self):
+
+ def fakedorequest(*args, **kwargs):
+ uri = args[1]
+ if 'node/xyz' in uri:
+ return {'version': '3.0.9999'}
+ elif 'node' in uri:
+ return {'result_count': 1,
+ 'results': [{'uuid': 'xyz'}]}
+
+ # mock do_request
+ with mock.patch.object(nvplib, 'do_request', new=fakedorequest):
+ version = nvplib.get_cluster_version('whatever')
+ self.assertEqual(version, '3.0')
+
+ def test_get_cluster_version_no_nodes(self):
+ def fakedorequest(*args, **kwargs):
+ uri = args[1]
+ if 'node' in uri:
+ return {'result_count': 0}
+
+ # mock do_request
+ with mock.patch.object(nvplib, 'do_request', new=fakedorequest):
+ version = nvplib.get_cluster_version('whatever')
+ self.assertIsNone(version)