from neutron.plugins.bigswitch import extensions
from neutron.plugins.bigswitch import routerrule_db
from neutron.plugins.bigswitch import servermanager
-from neutron.plugins.bigswitch.version import version_string_with_vcs
+from neutron.plugins.bigswitch import version
LOG = logging.getLogger(__name__)
def __init__(self, server_timeout=None):
super(NeutronRestProxyV2, self).__init__()
LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'),
- version_string_with_vcs())
+ version.version_string_with_vcs())
pl_config.register_config()
self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
import json
import re
-from six.moves import xrange
-from wsgiref.simple_server import make_server
+from six import moves
+from wsgiref import simple_server
class TestNetworkCtrl(object):
def request_handler(self, method, uri, body):
retstatus = self.default_status
retbody = self.default_response
- for i in xrange(len(self.matches)):
+ for i in moves.xrange(len(self.matches)):
(prior, method_regexp, uri_regexp, handler, data, multi) = \
self.matches[i]
if re.match(method_regexp, method) and re.match(uri_regexp, uri):
print('%s: %s' % ('Response',
json.dumps(body_data, sort_keys=True, indent=4)))
return body
- return make_server(self.host, self.port, app)
+ return simple_server.make_server(self.host, self.port, app)
def run(self):
print("Serving on port %d ..." % self.port)
"""Determine version of NeutronRestProxy plugin"""
from __future__ import print_function
-# if vcsversion exists, use it. Else, use LOCALBRANCH:LOCALREVISION
-try:
- from neutron.plugins.bigswitch.vcsversion import version_info
-except ImportError:
- version_info = {'branch_nick': u'LOCALBRANCH',
- 'revision_id': u'LOCALREVISION',
- 'revno': 0}
-try:
- from neutron.plugins.bigswitch.vcsversion import NeutronRestPROXY_VERSION
-except ImportError:
- NeutronRestPROXY_VERSION = ['2013', '1', None]
-try:
- from neutron.plugins.bigswitch.vcsversion import FINAL
-except ImportError:
- FINAL = False # This becomes true at Release Candidate time
+from neutron.plugins.bigswitch import vcsversion
-YEAR, COUNT, REVISION = NeutronRestPROXY_VERSION
+YEAR, COUNT, REVISION = vcsversion.NEUTRONRESTPROXY_VERSION
def canonical_version_string():
- return '.'.join(filter(None, NeutronRestPROXY_VERSION))
+ return '.'.join(filter(None,
+ vcsversion.NEUTRONRESTPROXY_VERSION))
def version_string():
- if FINAL:
+ if vcsversion.FINAL:
return canonical_version_string()
else:
return '%s-dev' % (canonical_version_string(),)
def vcs_version_string():
- return "%s:%s" % (version_info['branch_nick'], version_info['revision_id'])
+ return "%s:%s" % (vcsversion.version_info['branch_nick'],
+ vcsversion.version_info['revision_id'])
def version_string_with_vcs():
"""A Vlan Bitmap class to handle allocation/de-allocation of vlan ids."""
-from six.moves import xrange
+from six import moves
from neutron.common import constants
from neutron.plugins.brocade.db import models as brocade_db
min_vlan_search = vlan_id or MIN_VLAN
max_vlan_search = (vlan_id and vlan_id + 1) or MAX_VLAN
- for vlan in xrange(min_vlan_search, max_vlan_search):
+ for vlan in moves.xrange(min_vlan_search, max_vlan_search):
if vlan not in self.vlans:
self.vlans.add(vlan)
return vlan
from neutron.api import api_common as common
from neutron.api import extensions
-from neutron.manager import NeutronManager
+from neutron import manager
from neutron.plugins.cisco.common import cisco_exceptions as exception
from neutron.plugins.cisco.common import cisco_faults as faults
from neutron.plugins.cisco.extensions import _qos_view as qos_view
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
- controller = QosController(NeutronManager.get_plugin())
+ controller = QosController(manager.NeutronManager.get_plugin())
return [extensions.ResourceExtension('qoss', controller,
parent=parent_resource)]
from neutron.plugins.cisco.common import config as conf
from neutron.plugins.cisco.db import network_db_v2 as cdb
from neutron.plugins.cisco.db import nexus_db_v2 as nxos_db
-from neutron.plugins.cisco.l2device_plugin_base import L2DevicePluginBase
+from neutron.plugins.cisco import l2device_plugin_base
LOG = logging.getLogger(__name__)
-class NexusPlugin(L2DevicePluginBase):
+class NexusPlugin(l2device_plugin_base.L2DevicePluginBase):
"""Nexus PlugIn Main Class."""
_networks = {}
#
# @author: Ivar Lazzaro, Embrane, Inc.
-from functools import wraps
+import functools
from heleosapi import exceptions as h_exc
else:
handler[event].append(f)
- @wraps(f)
+ @functools.wraps(f)
def wrapped_f(*args, **kwargs):
return f(*args, **kwargs)
return wrapped_f
# under the License.
# @author: Alessandro Pilotti, Cloudbase Solutions Srl
-from six.moves import xrange
+from six import moves
from sqlalchemy.orm import exc
from neutron.common import exceptions as n_exc
# physical network
vlan_ids = set()
for vlan_range in vlan_ranges:
- vlan_ids |= set(xrange(vlan_range[0], vlan_range[1] + 1))
+ vlan_ids |= set(moves.xrange(vlan_range[0],
+ vlan_range[1] + 1))
# remove from table unallocated vlans not currently allocatable
self._remove_non_allocatable_vlans(session,
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
-from neutron.db.models_v2 import model_base
+from neutron.db import model_base
class VlanAllocation(model_base.BASEV2):
from neutron.openstack.common import log as logging
from neutron.plugins.ibm.common import config # noqa
from neutron.plugins.ibm.common import constants
-from neutron.wsgi import Serializer
+from neutron import wsgi
LOG = logging.getLogger(__name__)
'''Serializes a dictionary with a single key.'''
if isinstance(data, dict):
- return Serializer().serialize(data, self.content_type())
+ return wsgi.Serializer().serialize(data, self.content_type())
elif data:
raise TypeError(_("unable to serialize object type: '%s'") %
type(data))
if status_code == httplib.NO_CONTENT:
return data
try:
- deserialized_data = Serializer(
+ deserialized_data = wsgi.Serializer(
metadata=self._s_meta).deserialize(data, self.content_type())
deserialized_data = deserialized_data['body']
except Exception:
# See the License for the specific language governing permissions and
# limitations under the License.
-from six.moves import xrange
+from six import moves
from sqlalchemy.orm import exc
from neutron.common import exceptions as n_exc
# physical network
vlan_ids = set()
for vlan_range in vlan_ranges:
- vlan_ids |= set(xrange(vlan_range[0], vlan_range[1] + 1))
+ vlan_ids |= set(moves.xrange(vlan_range[0], vlan_range[1] + 1))
# remove from table unallocated vlans not currently allocatable
if physical_network in allocations:
from neutron.extensions import portbindings
from neutron.openstack.common import log
from neutron.plugins.bigswitch import config as pl_config
-from neutron.plugins.bigswitch.plugin import NeutronRestProxyV2Base
+from neutron.plugins.bigswitch import plugin
from neutron.plugins.bigswitch import servermanager
from neutron.plugins.ml2 import driver_api as api
LOG = log.getLogger(__name__)
-class BigSwitchMechanismDriver(NeutronRestProxyV2Base,
+class BigSwitchMechanismDriver(plugin.NeutronRestProxyV2Base,
api.MechanismDriver):
"""Mechanism Driver for Big Switch Networks Controller.
# under the License.
from oslo.config import cfg
-from six.moves import xrange
+from six import moves
import sqlalchemy as sa
from sqlalchemy.orm import exc as sa_exc
"%(tun_min)s:%(tun_max)s"),
{'tun_min': tun_min, 'tun_max': tun_max})
else:
- gre_ids |= set(xrange(tun_min, tun_max + 1))
+ gre_ids |= set(moves.xrange(tun_min, tun_max + 1))
session = db_api.get_session()
with session.begin(subtransactions=True):
import sys
from oslo.config import cfg
-from six.moves import xrange
+from six import moves
import sqlalchemy as sa
from neutron.common import constants as q_const
# this physical network
vlan_ids = set()
for vlan_min, vlan_max in vlan_ranges:
- vlan_ids |= set(xrange(vlan_min, vlan_max + 1))
+ vlan_ids |= set(moves.xrange(vlan_min, vlan_max + 1))
# remove from table unallocated vlans not currently
# allocatable
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
-from neutron.plugins.mlnx.common.comm_utils import RetryDecorator
+from neutron.plugins.mlnx.common import comm_utils
from neutron.plugins.mlnx.common import exceptions
zmq = importutils.try_import('eventlet.green.zmq')
self.poller.register(self._conn, zmq.POLLIN)
return self.__conn
- @RetryDecorator(exceptions.RequestTimeout)
+ @comm_utils.RetryDecorator(exceptions.RequestTimeout)
def send_msg(self, msg):
self._conn.send(msg)
# See the License for the specific language governing permissions and
# limitations under the License.
-from six.moves import xrange
+from six import moves
from sqlalchemy.orm import exc
from neutron.common import exceptions as n_exc
# physical network
vlan_ids = set()
for vlan_range in vlan_ranges:
- vlan_ids |= set(xrange(vlan_range[0], vlan_range[1] + 1))
+ vlan_ids |= set(moves.xrange(vlan_range[0], vlan_range[1] + 1))
# remove from table unallocated vlans not currently allocatable
_remove_non_allocatable_vlans(session, allocations,
from neutron.api.v2 import base
from neutron.common import constants
from neutron.common import exceptions
-from neutron.manager import NeutronManager
+from neutron import manager
from neutron import quota
quota.QUOTAS.register_resource(qresource)
resource = base.create_resource(COLLECTION, RESOURCE,
- NeutronManager.get_plugin(),
+ manager.NeutronManager.get_plugin(),
PACKET_FILTER_ATTR_PARAMS)
pf_ext = extensions.ResourceExtension(
COLLECTION, resource, attr_map=PACKET_FILTER_ATTR_PARAMS)
#
# @author: Ronak Shah, Nuage Networks, Alcatel-Lucent USA Inc.
-from abc import abstractmethod
+import abc
from neutron.api import extensions
from neutron.api.v2 import base
class NetPartitionPluginBase(object):
- @abstractmethod
+ @abc.abstractmethod
def create_net_partition(self, context, router):
pass
- @abstractmethod
+ @abc.abstractmethod
def update_net_partition(self, context, id, router):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_net_partition(self, context, id, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_net_partition(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_net_partitions(self, context, filters=None, fields=None):
pass
import httplib
import time
-from urlparse import urljoin
from oslo.config import cfg
import requests
+from six.moves.urllib import parse
from neutron.openstack.common import jsonutils as json
from neutron.openstack.common import log as logging
headers = {"Content-Type": "application/json"}
- login_url = urljoin(self.api_url,
- "/pluginhandler/ocplugin/authmgmt/login")
+ login_url = parse.urljoin(self.api_url,
+ "/pluginhandler/ocplugin/authmgmt/login")
data = json.dumps({"user_name": self._user, "passwd": self._password})
headers = {"Content-Type": content_type}
- uri = urljoin(url, "?authToken=%s" % self.auth_token)
+ uri = parse.urljoin(url, "?authToken=%s" % self.auth_token)
- url = urljoin(self.api_url, uri)
+ url = parse.urljoin(self.api_url, uri)
request_ok = False
response = None
import eventlet
import netaddr
from oslo.config import cfg
-from six.moves import xrange
+from six import moves
from neutron.agent import l2population_rpc
from neutron.agent.linux import ip_lib
'''
self.veth_mtu = veth_mtu
self.root_helper = root_helper
- self.available_local_vlans = set(xrange(q_const.MIN_VLAN_TAG,
- q_const.MAX_VLAN_TAG))
+ self.available_local_vlans = set(moves.xrange(q_const.MIN_VLAN_TAG,
+ q_const.MAX_VLAN_TAG))
self.tunnel_types = tunnel_types or []
self.l2_pop = l2_population
self.agent_state = {
# License for the specific language governing permissions and limitations
# under the License.
-from six.moves import xrange
+from six import moves
from sqlalchemy import func
from sqlalchemy.orm import exc
# physical network
vlan_ids = set()
for vlan_range in vlan_ranges:
- vlan_ids |= set(xrange(vlan_range[0], vlan_range[1] + 1))
+ vlan_ids |= set(moves.xrange(vlan_range[0], vlan_range[1] + 1))
# remove from table unallocated vlans not currently allocatable
if physical_network in allocations:
"%(tun_min)s:%(tun_max)s"),
{'tun_min': tun_min, 'tun_max': tun_max})
else:
- tunnel_ids |= set(xrange(tun_min, tun_max + 1))
+ tunnel_ids |= set(moves.xrange(tun_min, tun_max + 1))
session = db.get_session()
with session.begin():
# doesn't conflict with any other concurrently executed
# DB transactions in spite of the specified transactions
# isolation level value
- for i in xrange(max_retries):
+ for i in moves.xrange(max_retries):
LOG.debug(_('Adding a tunnel endpoint for %s'), ip)
try:
session = db.get_session()
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.schema import UniqueConstraint
+from neutron.db import model_base
from neutron.db import models_v2
-from neutron.db.models_v2 import model_base
from sqlalchemy import orm
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.plugins.plumgrid.common import exceptions as plum_excep
-from neutron.plugins.plumgrid.plumgrid_plugin.plugin_ver import VERSION
+from neutron.plugins.plumgrid.plumgrid_plugin import plugin_ver
LOG = logging.getLogger(__name__)
PLUM_DRIVER = 'neutron.plugins.plumgrid.drivers.plumlib.Plumlib'
"""
def _get_plugin_version(self):
- return VERSION
+ return plugin_ver.VERSION
def _port_viftype_binding(self, context, port):
port[portbindings.VIF_TYPE] = portbindings.VIF_TYPE_IOVISOR
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
-from neutron.agent.linux.ovs_lib import VifPort
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as logging_config
return
ofport = self.get_ofport(name)
- return VifPort(name, ofport, None, None, self)
+ return ovs_lib.VifPort(name, ofport, None, None, self)
def get_external_ports(self):
return self._get_ports(self._get_external_port)
# License for the specific language governing permissions and limitations
# under the License.
-from abc import ABCMeta
+import abc
import httplib
import six
import time
from neutron.openstack.common import log as logging
-from neutron.plugins.vmware.api_client import ctrl_conn_to_str
+from neutron.plugins.vmware import api_client
LOG = logging.getLogger(__name__)
DEFAULT_CONNECT_TIMEOUT = 5
-@six.add_metaclass(ABCMeta)
+@six.add_metaclass(abc.ABCMeta)
class ApiClientBase(object):
"""An abstract baseclass for all API client implementations."""
if getattr(conn, 'last_used', now) < now - self.CONN_IDLE_TIMEOUT:
LOG.info(_("[%(rid)d] Connection %(conn)s idle for %(sec)0.2f "
"seconds; reconnecting."),
- {'rid': rid, 'conn': ctrl_conn_to_str(conn),
+ {'rid': rid, 'conn': api_client.ctrl_conn_to_str(conn),
'sec': now - conn.last_used})
conn = self._create_connection(*self._conn_params(conn))
qsize = self._conn_pool.qsize()
LOG.debug(_("[%(rid)d] Acquired connection %(conn)s. %(qsize)d "
"connection(s) available."),
- {'rid': rid, 'conn': ctrl_conn_to_str(conn), 'qsize': qsize})
+ {'rid': rid, 'conn': api_client.ctrl_conn_to_str(conn),
+ 'qsize': qsize})
if auto_login and self.auth_cookie(conn) is None:
self._wait_for_login(conn, headers)
return conn
if self._conn_params(http_conn) not in self._api_providers:
LOG.debug(_("[%(rid)d] Released connection %(conn)s is not an "
"API provider for the cluster"),
- {'rid': rid, 'conn': ctrl_conn_to_str(http_conn)})
+ {'rid': rid,
+ 'conn': api_client.ctrl_conn_to_str(http_conn)})
return
elif hasattr(http_conn, "no_release"):
return
# Reconnect to provider.
LOG.warn(_("[%(rid)d] Connection returned in bad state, "
"reconnecting to %(conn)s"),
- {'rid': rid, 'conn': ctrl_conn_to_str(http_conn)})
+ {'rid': rid,
+ 'conn': api_client.ctrl_conn_to_str(http_conn)})
http_conn = self._create_connection(*self._conn_params(http_conn))
priority = self._next_conn_priority
self._next_conn_priority += 1
self._conn_pool.put((priority, http_conn))
LOG.debug(_("[%(rid)d] Released connection %(conn)s. %(qsize)d "
"connection(s) available."),
- {'rid': rid, 'conn': ctrl_conn_to_str(http_conn),
+ {'rid': rid, 'conn': api_client.ctrl_conn_to_str(http_conn),
'qsize': self._conn_pool.qsize()})
def _wait_for_login(self, conn, headers=None):
data = self._get_provider_data(conn)
if data is None:
LOG.error(_("Login request for an invalid connection: '%s'"),
- ctrl_conn_to_str(conn))
+ api_client.ctrl_conn_to_str(conn))
return
provider_sem = data[0]
if provider_sem.acquire(blocking=False):
# under the License.
#
-from abc import ABCMeta
-from abc import abstractmethod
+import abc
import copy
import eventlet
import httplib
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
-from neutron.plugins.vmware.api_client import ctrl_conn_to_str
+from neutron.plugins.vmware import api_client
LOG = logging.getLogger(__name__)
DOWNLOAD_TIMEOUT = 180
-@six.add_metaclass(ABCMeta)
+@six.add_metaclass(abc.ABCMeta)
class ApiRequest(object):
'''An abstract baseclass for all ApiRequest implementations.
httplib.SERVICE_UNAVAILABLE
]
- @abstractmethod
+ @abc.abstractmethod
def start(self):
pass
- @abstractmethod
+ @abc.abstractmethod
def join(self):
pass
- @abstractmethod
+ @abc.abstractmethod
def copy(self):
pass
def _request_str(self, conn, url):
'''Return string representation of connection.'''
- return "%s %s/%s" % (self._method, ctrl_conn_to_str(conn), url)
+ return "%s %s/%s" % (self._method, api_client.ctrl_conn_to_str(conn),
+ url)
import hashlib
-from neutron.api.v2.attributes import is_attr_set
+from neutron.api.v2 import attributes
from neutron.openstack.common import log
-from neutron.version import version_info
+from neutron import version
LOG = log.getLogger(__name__)
MAX_DISPLAY_NAME_LEN = 40
-NEUTRON_VERSION = version_info.release_string()
+NEUTRON_VERSION = version.version_info.release_string()
# Allowed network types for the NSX Plugin
def check_and_truncate(display_name):
- if is_attr_set(display_name) and len(display_name) > MAX_DISPLAY_NAME_LEN:
+ if (attributes.is_attr_set(display_name) and
+ len(display_name) > MAX_DISPLAY_NAME_LEN):
LOG.debug(_("Specified name:'%s' exceeds maximum length. "
"It will be truncated on NSX"), display_name)
return display_name[:MAX_DISPLAY_NAME_LEN]
# under the License.
#
-from abc import abstractmethod
+import abc
from oslo.config import cfg
from neutron.api.v2 import attributes
from neutron.api.v2 import resource_helper
-from neutron.plugins.vmware.common.utils import NetworkTypes
+from neutron.plugins.vmware.common import utils
GATEWAY_RESOURCE_NAME = "network_gateway"
DEVICE_RESOURCE_NAME = "gateway_device"
msg = _("A connector type is required to create a gateway device")
return msg
connector_types = (valid_values if valid_values else
- [NetworkTypes.GRE,
- NetworkTypes.STT,
- NetworkTypes.BRIDGE,
- 'ipsec%s' % NetworkTypes.GRE,
- 'ipsec%s' % NetworkTypes.STT])
+ [utils.NetworkTypes.GRE,
+ utils.NetworkTypes.STT,
+ utils.NetworkTypes.BRIDGE,
+ 'ipsec%s' % utils.NetworkTypes.GRE,
+ 'ipsec%s' % utils.NetworkTypes.STT])
if data not in connector_types:
msg = _("Unknown connector type: %s") % data
return msg
class NetworkGatewayPluginBase(object):
- @abstractmethod
+ @abc.abstractmethod
def create_network_gateway(self, context, network_gateway):
pass
- @abstractmethod
+ @abc.abstractmethod
def update_network_gateway(self, context, id, network_gateway):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_network_gateway(self, context, id, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_network_gateway(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_network_gateways(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
- @abstractmethod
+ @abc.abstractmethod
def connect_network(self, context, network_gateway_id,
network_mapping_info):
pass
- @abstractmethod
+ @abc.abstractmethod
def disconnect_network(self, context, network_gateway_id,
network_mapping_info):
pass
- @abstractmethod
+ @abc.abstractmethod
def create_gateway_device(self, context, gateway_device):
pass
- @abstractmethod
+ @abc.abstractmethod
def update_gateway_device(self, context, id, gateway_device):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_gateway_device(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_gateway_device(self, context, id, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_gateway_devices(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
# under the License.
#
-from abc import abstractmethod
+import abc
from neutron.api import extensions
from neutron.api.v2 import attributes as attr
class QueuePluginBase(object):
- @abstractmethod
+ @abc.abstractmethod
def create_qos_queue(self, context, queue):
pass
- @abstractmethod
+ @abc.abstractmethod
def delete_qos_queue(self, context, id):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_qos_queue(self, context, id, fields=None):
pass
- @abstractmethod
+ @abc.abstractmethod
def get_qos_queues(self, context, filters=None, fields=None, sorts=None,
limit=None, marker=None, page_reverse=False):
pass
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
-from neutron.version import version_info
+from neutron import version
HTTP_GET = "GET"
HTTP_POST = "POST"
HTTP_PUT = "PUT"
# Prefix to be used for all NSX API calls
URI_PREFIX = "/ws.v1"
-NEUTRON_VERSION = version_info.release_string()
+NEUTRON_VERSION = version.version_info.release_string()
LOG = log.getLogger(__name__)
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nsxlib import _build_uri_path
-from neutron.plugins.vmware.nsxlib import do_request
-from neutron.plugins.vmware.nsxlib import get_all_query_pages
+from neutron.plugins.vmware import nsxlib
from neutron.plugins.vmware.nsxlib import switch
HTTP_GET = "GET"
"gateways": gateways,
"type": "L2GatewayServiceConfig"
}
- return do_request(
- HTTP_POST, _build_uri_path(GWSERVICE_RESOURCE),
+ return nsxlib.do_request(
+ HTTP_POST, nsxlib._build_uri_path(GWSERVICE_RESOURCE),
json.dumps(gwservice_obj), cluster=cluster)
def get_l2_gw_service(cluster, gateway_id):
- return do_request(
- HTTP_GET, _build_uri_path(GWSERVICE_RESOURCE,
- resource_id=gateway_id),
+ return nsxlib.do_request(
+ HTTP_GET, nsxlib._build_uri_path(GWSERVICE_RESOURCE,
+ resource_id=gateway_id),
cluster=cluster)
if tenant_id:
actual_filters['tag'] = tenant_id
actual_filters['tag_scope'] = 'os_tid'
- return get_all_query_pages(
- _build_uri_path(GWSERVICE_RESOURCE,
- filters=actual_filters),
+ return nsxlib.get_all_query_pages(
+ nsxlib._build_uri_path(GWSERVICE_RESOURCE,
+ filters=actual_filters),
cluster)
# Nothing to update
return gwservice_obj
gwservice_obj["display_name"] = utils.check_and_truncate(display_name)
- return do_request(HTTP_PUT, _build_uri_path(GWSERVICE_RESOURCE,
- resource_id=gateway_id),
- json.dumps(gwservice_obj), cluster=cluster)
+ return nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(GWSERVICE_RESOURCE,
+ resource_id=gateway_id),
+ json.dumps(gwservice_obj), cluster=cluster)
def delete_l2_gw_service(cluster, gateway_id):
- do_request(HTTP_DELETE, _build_uri_path(GWSERVICE_RESOURCE,
- resource_id=gateway_id),
- cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE,
+ nsxlib._build_uri_path(GWSERVICE_RESOURCE,
+ resource_id=gateway_id),
+ cluster=cluster)
def _build_gateway_device_body(tenant_id, display_name, neutron_id,
connector_type, connector_ip,
client_certificate, tz_uuid)
try:
- return do_request(
- HTTP_POST, _build_uri_path(TRANSPORTNODE_RESOURCE),
+ return nsxlib.do_request(
+ HTTP_POST, nsxlib._build_uri_path(TRANSPORTNODE_RESOURCE),
json.dumps(body), cluster=cluster)
except api_exc.InvalidSecurityCertificate:
raise nsx_exc.InvalidSecurityCertificate()
connector_type, connector_ip,
client_certificate, tz_uuid)
try:
- return do_request(
+ return nsxlib.do_request(
HTTP_PUT,
- _build_uri_path(TRANSPORTNODE_RESOURCE, resource_id=gateway_id),
+ nsxlib._build_uri_path(TRANSPORTNODE_RESOURCE,
+ resource_id=gateway_id),
json.dumps(body), cluster=cluster)
except api_exc.InvalidSecurityCertificate:
raise nsx_exc.InvalidSecurityCertificate()
def delete_gateway_device(cluster, device_uuid):
- return do_request(HTTP_DELETE,
- _build_uri_path(TRANSPORTNODE_RESOURCE,
- device_uuid),
- cluster=cluster)
+ return nsxlib.do_request(HTTP_DELETE,
+ nsxlib._build_uri_path(TRANSPORTNODE_RESOURCE,
+ device_uuid),
+ cluster=cluster)
def get_gateway_device_status(cluster, device_uuid):
- status_res = do_request(HTTP_GET,
- _build_uri_path(TRANSPORTNODE_RESOURCE,
- device_uuid,
- extra_action='status'),
- cluster=cluster)
+ status_res = nsxlib.do_request(HTTP_GET,
+ nsxlib._build_uri_path(
+ TRANSPORTNODE_RESOURCE,
+ device_uuid,
+ extra_action='status'),
+ cluster=cluster)
# Returns the connection status
return status_res['connection']['connected']
def get_gateway_devices_status(cluster, tenant_id=None):
if tenant_id:
- gw_device_query_path = _build_uri_path(
+ gw_device_query_path = nsxlib._build_uri_path(
TRANSPORTNODE_RESOURCE,
fields="uuid,tags",
relations="TransportNodeStatus",
filters={'tag': tenant_id,
'tag_scope': 'os_tid'})
else:
- gw_device_query_path = _build_uri_path(
+ gw_device_query_path = nsxlib._build_uri_path(
TRANSPORTNODE_RESOURCE,
fields="uuid,tags",
relations="TransportNodeStatus")
- response = get_all_query_pages(gw_device_query_path, cluster)
+ response = nsxlib.get_all_query_pages(gw_device_query_path, cluster)
results = {}
for item in response:
results[item['uuid']] = (item['_relations']['TransportNodeStatus']
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nsxlib import _build_uri_path
-from neutron.plugins.vmware.nsxlib import do_request
+from neutron.plugins.vmware import nsxlib
HTTP_GET = "GET"
HTTP_POST = "POST"
try:
exists = (
svc_cluster_id and
- do_request(HTTP_GET,
- _build_uri_path(SERVICECLUSTER_RESOURCE,
- resource_id=svc_cluster_id),
- cluster=cluster) is not None)
+ nsxlib.do_request(HTTP_GET,
+ nsxlib._build_uri_path(
+ SERVICECLUSTER_RESOURCE,
+ resource_id=svc_cluster_id),
+ cluster=cluster) is not None)
except exception.NotFound:
pass
return exists
"edge_cluster_uuid": cluster.default_service_cluster_uuid,
"tags": utils.get_tags(n_network_id=network_id)
}
- return do_request(HTTP_POST,
- _build_uri_path(LSERVICESNODE_RESOURCE),
- json.dumps(lsn_obj),
- cluster=cluster)["uuid"]
+ return nsxlib.do_request(HTTP_POST,
+ nsxlib._build_uri_path(LSERVICESNODE_RESOURCE),
+ json.dumps(lsn_obj),
+ cluster=cluster)["uuid"]
def lsn_for_network_get(cluster, network_id):
filters = {"tag": network_id, "tag_scope": "n_network_id"}
- results = do_request(HTTP_GET,
- _build_uri_path(LSERVICESNODE_RESOURCE,
- fields="uuid",
- filters=filters),
- cluster=cluster)['results']
+ results = nsxlib.do_request(HTTP_GET,
+ nsxlib._build_uri_path(LSERVICESNODE_RESOURCE,
+ fields="uuid",
+ filters=filters),
+ cluster=cluster)['results']
if not results:
raise exception.NotFound()
elif len(results) == 1:
def lsn_delete(cluster, lsn_id):
- do_request(HTTP_DELETE,
- _build_uri_path(LSERVICESNODE_RESOURCE,
- resource_id=lsn_id),
- cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE,
+ nsxlib._build_uri_path(LSERVICESNODE_RESOURCE,
+ resource_id=lsn_id),
+ cluster=cluster)
def lsn_port_host_entries_update(
cluster, lsn_id, lsn_port_id, conf, hosts_data):
hosts_obj = {'hosts': hosts_data}
- do_request(HTTP_PUT,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- resource_id=lsn_port_id,
- extra_action=conf),
- json.dumps(hosts_obj),
- cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ resource_id=lsn_port_id,
+ extra_action=conf),
+ json.dumps(hosts_obj),
+ cluster=cluster)
def lsn_port_create(cluster, lsn_id, port_data):
n_subnet_id=port_data["subnet_id"]),
"type": "LogicalServicesNodePortConfig",
}
- return do_request(HTTP_POST,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id),
- json.dumps(port_obj),
- cluster=cluster)["uuid"]
+ return nsxlib.do_request(HTTP_POST,
+ nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id),
+ json.dumps(port_obj),
+ cluster=cluster)["uuid"]
def lsn_port_delete(cluster, lsn_id, lsn_port_id):
- return do_request(HTTP_DELETE,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- resource_id=lsn_port_id),
- cluster=cluster)
+ return nsxlib.do_request(HTTP_DELETE,
+ nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ resource_id=lsn_port_id),
+ cluster=cluster)
def _lsn_port_get(cluster, lsn_id, filters):
- results = do_request(HTTP_GET,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- fields="uuid",
- filters=filters),
- cluster=cluster)['results']
+ results = nsxlib.do_request(HTTP_GET,
+ nsxlib._build_uri_path(
+ LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ fields="uuid",
+ filters=filters),
+ cluster=cluster)['results']
if not results:
raise exception.NotFound()
elif len(results) == 1:
def lsn_port_info_get(cluster, lsn_id, lsn_port_id):
- result = do_request(HTTP_GET,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- resource_id=lsn_port_id),
- cluster=cluster)
+ result = nsxlib.do_request(HTTP_GET,
+ nsxlib._build_uri_path(
+ LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ resource_id=lsn_port_id),
+ cluster=cluster)
for tag in result['tags']:
if tag['scope'] == 'n_subnet_id':
result['subnet_id'] = tag['tag']
"peer_port_uuid": lswitch_port_id
}
try:
- do_request(HTTP_PUT,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- resource_id=lsn_port_id,
- is_attachment=True),
- json.dumps(patch_obj),
- cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ resource_id=lsn_port_id,
+ is_attachment=True),
+ json.dumps(patch_obj),
+ cluster=cluster)
except api_exc.Conflict:
# This restriction might be lifted at some point
msg = (_("Attempt to plug Logical Services Node %(lsn)s into "
cluster, lsn_id, action, is_enabled, obj):
lsn_obj = {"enabled": is_enabled}
lsn_obj.update(obj)
- do_request(HTTP_PUT,
- _build_uri_path(LSERVICESNODE_RESOURCE,
- resource_id=lsn_id,
- extra_action=action),
- json.dumps(lsn_obj),
- cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSERVICESNODE_RESOURCE,
+ resource_id=lsn_id,
+ extra_action=action),
+ json.dumps(lsn_obj),
+ cluster=cluster)
def _lsn_port_configure_action(
cluster, lsn_id, lsn_port_id, action, is_enabled, obj):
- do_request(HTTP_PUT,
- _build_uri_path(LSERVICESNODE_RESOURCE,
- resource_id=lsn_id,
- extra_action=action),
- json.dumps({"enabled": is_enabled}),
- cluster=cluster)
- do_request(HTTP_PUT,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- resource_id=lsn_port_id,
- extra_action=action),
- json.dumps(obj),
- cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSERVICESNODE_RESOURCE,
+ resource_id=lsn_id,
+ extra_action=action),
+ json.dumps({"enabled": is_enabled}),
+ cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ resource_id=lsn_port_id,
+ extra_action=action),
+ json.dumps(obj),
+ cluster=cluster)
def _get_opts(name, value):
def _lsn_port_host_action(
cluster, lsn_id, lsn_port_id, host_obj, extra_action, action):
- do_request(HTTP_POST,
- _build_uri_path(LSERVICESNODEPORT_RESOURCE,
- parent_resource_id=lsn_id,
- resource_id=lsn_port_id,
- extra_action=extra_action,
- filters={"action": action}),
- json.dumps(host_obj),
- cluster=cluster)
+ nsxlib.do_request(HTTP_POST,
+ nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
+ parent_resource_id=lsn_id,
+ resource_id=lsn_port_id,
+ extra_action=extra_action,
+ filters={"action": action}),
+ json.dumps(host_obj),
+ cluster=cluster)
def lsn_port_dhcp_host_add(cluster, lsn_id, lsn_port_id, host_data):
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nsxlib import _build_uri_path
-from neutron.plugins.vmware.nsxlib import do_request
+from neutron.plugins.vmware import nsxlib
HTTP_POST = "POST"
HTTP_DELETE = "DELETE"
queue_obj['tags'] = utils.get_tags()
try:
- return do_request(HTTP_POST,
- _build_uri_path(LQUEUE_RESOURCE),
- jsonutils.dumps(queue_obj),
- cluster=cluster)['uuid']
+ return nsxlib.do_request(HTTP_POST,
+ nsxlib._build_uri_path(LQUEUE_RESOURCE),
+ jsonutils.dumps(queue_obj),
+ cluster=cluster)['uuid']
except api_exc.NsxApiException:
# FIXME(salv-orlando): This should not raise NeutronException
with excutils.save_and_reraise_exception():
def delete_lqueue(cluster, queue_id):
try:
- do_request(HTTP_DELETE,
- _build_uri_path(LQUEUE_RESOURCE,
- resource_id=queue_id),
- cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE,
+ nsxlib._build_uri_path(LQUEUE_RESOURCE,
+ resource_id=queue_id),
+ cluster=cluster)
except Exception:
# FIXME(salv-orlando): This should not raise NeutronException
with excutils.save_and_reraise_exception():
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nsxlib import _build_uri_path
-from neutron.plugins.vmware.nsxlib import do_request
-from neutron.plugins.vmware.nsxlib import get_all_query_pages
-from neutron.plugins.vmware.nsxlib.switch import get_port
-from neutron.plugins.vmware.nsxlib.versioning import DEFAULT_VERSION
-from neutron.plugins.vmware.nsxlib.versioning import versioned
+from neutron.plugins.vmware import nsxlib
+from neutron.plugins.vmware.nsxlib import switch
+from neutron.plugins.vmware.nsxlib import versioning
HTTP_GET = "GET"
HTTP_POST = "POST"
"SingleDefaultRouteImplicitRoutingConfig",
distributed=distributed,
**implicit_routing_config)
- return do_request(HTTP_POST, _build_uri_path(LROUTER_RESOURCE),
- jsonutils.dumps(lrouter_obj), cluster=cluster)
+ return nsxlib.do_request(HTTP_POST,
+ nsxlib._build_uri_path(LROUTER_RESOURCE),
+ jsonutils.dumps(lrouter_obj), cluster=cluster)
def create_implicit_routing_lrouter(cluster, neutron_router_id, tenant_id,
lrouter_obj = _prepare_lrouter_body(
display_name, neutron_router_id, tenant_id,
"RoutingTableRoutingConfig", distributed=distributed)
- router = do_request(HTTP_POST, _build_uri_path(LROUTER_RESOURCE),
- jsonutils.dumps(lrouter_obj), cluster=cluster)
+ router = nsxlib.do_request(HTTP_POST,
+ nsxlib._build_uri_path(LROUTER_RESOURCE),
+ jsonutils.dumps(lrouter_obj), cluster=cluster)
default_gw = {'prefix': '0.0.0.0/0', 'next_hop_ip': nexthop}
create_explicit_route_lrouter(cluster, router['uuid'], default_gw)
return router
def delete_lrouter(cluster, lrouter_id):
- do_request(HTTP_DELETE, _build_uri_path(LROUTER_RESOURCE,
- resource_id=lrouter_id),
- cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE,
+ nsxlib._build_uri_path(LROUTER_RESOURCE,
+ resource_id=lrouter_id),
+ cluster=cluster)
def get_lrouter(cluster, lrouter_id):
- return do_request(HTTP_GET,
- _build_uri_path(LROUTER_RESOURCE,
- resource_id=lrouter_id,
- relations='LogicalRouterStatus'),
- cluster=cluster)
+ return nsxlib.do_request(HTTP_GET,
+ nsxlib._build_uri_path(
+ LROUTER_RESOURCE,
+ resource_id=lrouter_id,
+ relations='LogicalRouterStatus'),
+ cluster=cluster)
def query_lrouters(cluster, fields=None, filters=None):
- return get_all_query_pages(
- _build_uri_path(LROUTER_RESOURCE,
- fields=fields,
- relations='LogicalRouterStatus',
- filters=filters),
+ return nsxlib.get_all_query_pages(
+ nsxlib._build_uri_path(LROUTER_RESOURCE,
+ fields=fields,
+ relations='LogicalRouterStatus',
+ filters=filters),
cluster)
"default_route_next_hop")
if nh_element:
nh_element["gateway_ip_address"] = nexthop
- return do_request(HTTP_PUT, _build_uri_path(LROUTER_RESOURCE,
- resource_id=r_id),
- jsonutils.dumps(lrouter_obj),
- cluster=cluster)
+ return nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LROUTER_RESOURCE,
+ resource_id=r_id),
+ jsonutils.dumps(lrouter_obj),
+ cluster=cluster)
def get_explicit_routes_lrouter(cluster, router_id, protocol_type='static'):
static_filter = {'protocol': protocol_type}
- existing_routes = do_request(
+ existing_routes = nsxlib.do_request(
HTTP_GET,
- _build_uri_path(LROUTERRIB_RESOURCE,
- filters=static_filter,
- fields="*",
- parent_resource_id=router_id),
+ nsxlib._build_uri_path(LROUTERRIB_RESOURCE,
+ filters=static_filter,
+ fields="*",
+ parent_resource_id=router_id),
cluster=cluster)['results']
return existing_routes
def delete_explicit_route_lrouter(cluster, router_id, route_id):
- do_request(HTTP_DELETE,
- _build_uri_path(LROUTERRIB_RESOURCE,
- resource_id=route_id,
- parent_resource_id=router_id),
- cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE,
+ nsxlib._build_uri_path(LROUTERRIB_RESOURCE,
+ resource_id=route_id,
+ parent_resource_id=router_id),
+ cluster=cluster)
def create_explicit_route_lrouter(cluster, router_id, route):
next_hop_ip = route.get("nexthop") or route.get("next_hop_ip")
prefix = route.get("destination") or route.get("prefix")
- uuid = do_request(
+ uuid = nsxlib.do_request(
HTTP_POST,
- _build_uri_path(LROUTERRIB_RESOURCE,
- parent_resource_id=router_id),
+ nsxlib._build_uri_path(LROUTERRIB_RESOURCE,
+ parent_resource_id=router_id),
jsonutils.dumps({
"action": "accept",
"next_hop_ip": next_hop_ip,
def get_default_route_explicit_routing_lrouter_v33(cluster, router_id):
static_filter = {"protocol": "static",
"prefix": "0.0.0.0/0"}
- default_route = do_request(
+ default_route = nsxlib.do_request(
HTTP_GET,
- _build_uri_path(LROUTERRIB_RESOURCE,
- filters=static_filter,
- fields="*",
- parent_resource_id=router_id),
+ nsxlib._build_uri_path(LROUTERRIB_RESOURCE,
+ filters=static_filter,
+ fields="*",
+ parent_resource_id=router_id),
cluster=cluster)["results"][0]
return default_route
"next_hop_ip": next_hop,
"prefix": "0.0.0.0/0",
"protocol": "static"}
- do_request(HTTP_PUT,
- _build_uri_path(LROUTERRIB_RESOURCE,
- resource_id=default_route['uuid'],
- parent_resource_id=router_id),
- jsonutils.dumps(new_default_route),
- cluster=cluster)
+ nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(
+ LROUTERRIB_RESOURCE,
+ resource_id=default_route['uuid'],
+ parent_resource_id=router_id),
+ jsonutils.dumps(new_default_route),
+ cluster=cluster)
def update_explicit_routing_lrouter(cluster, router_id,
def query_lrouter_lports(cluster, lr_uuid, fields="*",
filters=None, relations=None):
- uri = _build_uri_path(LROUTERPORT_RESOURCE, parent_resource_id=lr_uuid,
- fields=fields, filters=filters, relations=relations)
- return do_request(HTTP_GET, uri, cluster=cluster)['results']
+ uri = nsxlib._build_uri_path(LROUTERPORT_RESOURCE,
+ parent_resource_id=lr_uuid,
+ fields=fields, filters=filters,
+ relations=relations)
+ return nsxlib.do_request(HTTP_GET, uri, cluster=cluster)['results']
def create_router_lport(cluster, lrouter_uuid, tenant_id, neutron_port_id,
# when creating the fake_ext_gw there is no mac_address present.
if mac_address:
lport_obj['mac_address'] = mac_address
- path = _build_uri_path(LROUTERPORT_RESOURCE,
- parent_resource_id=lrouter_uuid)
- result = do_request(HTTP_POST, path, jsonutils.dumps(lport_obj),
- cluster=cluster)
+ path = nsxlib._build_uri_path(LROUTERPORT_RESOURCE,
+ parent_resource_id=lrouter_uuid)
+ result = nsxlib.do_request(HTTP_POST, path, jsonutils.dumps(lport_obj),
+ cluster=cluster)
LOG.debug(_("Created logical port %(lport_uuid)s on "
"logical router %(lrouter_uuid)s"),
for key in lport_obj.keys():
if lport_obj[key] is None:
del lport_obj[key]
- path = _build_uri_path(LROUTERPORT_RESOURCE,
- lrouter_port_uuid,
- parent_resource_id=lrouter_uuid)
- result = do_request(HTTP_PUT, path,
- jsonutils.dumps(lport_obj),
- cluster=cluster)
+ path = nsxlib._build_uri_path(LROUTERPORT_RESOURCE,
+ lrouter_port_uuid,
+ parent_resource_id=lrouter_uuid)
+ result = nsxlib.do_request(HTTP_PUT, path,
+ jsonutils.dumps(lport_obj),
+ cluster=cluster)
LOG.debug(_("Updated logical port %(lport_uuid)s on "
"logical router %(lrouter_uuid)s"),
{'lport_uuid': lrouter_port_uuid, 'lrouter_uuid': lrouter_uuid})
def delete_router_lport(cluster, lrouter_uuid, lport_uuid):
"""Creates a logical port on the assigned logical router."""
- path = _build_uri_path(LROUTERPORT_RESOURCE, lport_uuid, lrouter_uuid)
- do_request(HTTP_DELETE, path, cluster=cluster)
+ path = nsxlib._build_uri_path(LROUTERPORT_RESOURCE, lport_uuid,
+ lrouter_uuid)
+ nsxlib.do_request(HTTP_DELETE, path, cluster=cluster)
LOG.debug(_("Delete logical router port %(lport_uuid)s on "
"logical router %(lrouter_uuid)s"),
{'lport_uuid': lport_uuid,
def delete_peer_router_lport(cluster, lr_uuid, ls_uuid, lp_uuid):
- nsx_port = get_port(cluster, ls_uuid, lp_uuid,
- relations="LogicalPortAttachment")
+ nsx_port = switch.get_port(cluster, ls_uuid, lp_uuid,
+ relations="LogicalPortAttachment")
relations = nsx_port.get('_relations')
if relations:
att_data = relations.get('LogicalPortAttachment')
- L3GatewayAttachment [-> L3GatewayService uuid]
For the latter attachment type a VLAN ID can be specified as well.
"""
- uri = _build_uri_path(LROUTERPORT_RESOURCE, port_id, router_id,
- is_attachment=True)
+ uri = nsxlib._build_uri_path(LROUTERPORT_RESOURCE, port_id, router_id,
+ is_attachment=True)
attach_obj = {}
attach_obj["type"] = nsx_attachment_type
if nsx_attachment_type == "PatchAttachment":
else:
raise nsx_exc.InvalidAttachmentType(
attachment_type=nsx_attachment_type)
- return do_request(
+ return nsxlib.do_request(
HTTP_PUT, uri, jsonutils.dumps(attach_obj), cluster=cluster)
def _create_lrouter_nat_rule(cluster, router_id, nat_rule_obj):
LOG.debug(_("Creating NAT rule: %s"), nat_rule_obj)
- uri = _build_uri_path(LROUTERNAT_RESOURCE, parent_resource_id=router_id)
- return do_request(HTTP_POST, uri, jsonutils.dumps(nat_rule_obj),
- cluster=cluster)
+ uri = nsxlib._build_uri_path(LROUTERNAT_RESOURCE,
+ parent_resource_id=router_id)
+ return nsxlib.do_request(HTTP_POST, uri, jsonutils.dumps(nat_rule_obj),
+ cluster=cluster)
def _build_snat_rule_obj(min_src_ip, max_src_ip, nat_match_obj):
def delete_router_nat_rule(cluster, router_id, rule_id):
- uri = _build_uri_path(LROUTERNAT_RESOURCE, rule_id, router_id)
- do_request(HTTP_DELETE, uri, cluster=cluster)
+ uri = nsxlib._build_uri_path(LROUTERNAT_RESOURCE, rule_id, router_id)
+ nsxlib.do_request(HTTP_DELETE, uri, cluster=cluster)
def query_nat_rules(cluster, router_id, fields="*", filters=None):
- uri = _build_uri_path(LROUTERNAT_RESOURCE, parent_resource_id=router_id,
- fields=fields, filters=filters)
- return get_all_query_pages(uri, cluster)
+ uri = nsxlib._build_uri_path(LROUTERNAT_RESOURCE,
+ parent_resource_id=router_id,
+ fields=fields, filters=filters)
+ return nsxlib.get_all_query_pages(uri, cluster)
# NOTE(salvatore-orlando): The following FIXME applies in general to
# FIXME(salvatore-orlando): need a lock around the list of IPs on an iface
def update_lrouter_port_ips(cluster, lrouter_id, lport_id,
ips_to_add, ips_to_remove):
- uri = _build_uri_path(LROUTERPORT_RESOURCE, lport_id, lrouter_id)
+ uri = nsxlib._build_uri_path(LROUTERPORT_RESOURCE, lport_id, lrouter_id)
try:
- port = do_request(HTTP_GET, uri, cluster=cluster)
+ port = nsxlib.do_request(HTTP_GET, uri, cluster=cluster)
# TODO(salvatore-orlando): Enforce ips_to_add intersection with
# ips_to_remove is empty
ip_address_set = set(port['ip_addresses'])
ip_address_set = ip_address_set | set(ips_to_add)
# Set is not JSON serializable - convert to list
port['ip_addresses'] = list(ip_address_set)
- do_request(HTTP_PUT, uri, jsonutils.dumps(port), cluster=cluster)
+ nsxlib.do_request(HTTP_PUT, uri, jsonutils.dumps(port),
+ cluster=cluster)
except exception.NotFound:
# FIXME(salv-orlando):avoid raising different exception
data = {'lport_id': lport_id, 'lrouter_id': lrouter_id}
ROUTER_FUNC_DICT = {
'create_lrouter': {
- 2: {DEFAULT_VERSION: create_implicit_routing_lrouter, },
- 3: {DEFAULT_VERSION: create_implicit_routing_lrouter,
+ 2: {versioning.DEFAULT_VERSION: create_implicit_routing_lrouter, },
+ 3: {versioning.DEFAULT_VERSION: create_implicit_routing_lrouter,
1: create_implicit_routing_lrouter_with_distribution,
2: create_explicit_routing_lrouter, }, },
'update_lrouter': {
- 2: {DEFAULT_VERSION: update_implicit_routing_lrouter, },
- 3: {DEFAULT_VERSION: update_implicit_routing_lrouter,
+ 2: {versioning.DEFAULT_VERSION: update_implicit_routing_lrouter, },
+ 3: {versioning.DEFAULT_VERSION: update_implicit_routing_lrouter,
2: update_explicit_routing_lrouter, }, },
'create_lrouter_dnat_rule': {
- 2: {DEFAULT_VERSION: create_lrouter_dnat_rule_v2, },
- 3: {DEFAULT_VERSION: create_lrouter_dnat_rule_v3, }, },
+ 2: {versioning.DEFAULT_VERSION: create_lrouter_dnat_rule_v2, },
+ 3: {versioning.DEFAULT_VERSION: create_lrouter_dnat_rule_v3, }, },
'create_lrouter_snat_rule': {
- 2: {DEFAULT_VERSION: create_lrouter_snat_rule_v2, },
- 3: {DEFAULT_VERSION: create_lrouter_snat_rule_v3, }, },
+ 2: {versioning.DEFAULT_VERSION: create_lrouter_snat_rule_v2, },
+ 3: {versioning.DEFAULT_VERSION: create_lrouter_snat_rule_v3, }, },
'create_lrouter_nosnat_rule': {
- 2: {DEFAULT_VERSION: create_lrouter_nosnat_rule_v2, },
- 3: {DEFAULT_VERSION: create_lrouter_nosnat_rule_v3, }, },
+ 2: {versioning.DEFAULT_VERSION: create_lrouter_nosnat_rule_v2, },
+ 3: {versioning.DEFAULT_VERSION: create_lrouter_nosnat_rule_v3, }, },
'create_lrouter_nodnat_rule': {
- 2: {DEFAULT_VERSION: create_lrouter_nodnat_rule_v2, },
- 3: {DEFAULT_VERSION: create_lrouter_nodnat_rule_v3, }, },
+ 2: {versioning.DEFAULT_VERSION: create_lrouter_nodnat_rule_v2, },
+ 3: {versioning.DEFAULT_VERSION: create_lrouter_nodnat_rule_v3, }, },
'get_default_route_explicit_routing_lrouter': {
- 3: {DEFAULT_VERSION: get_default_route_explicit_routing_lrouter_v32,
+ 3: {versioning.DEFAULT_VERSION:
+ get_default_route_explicit_routing_lrouter_v32,
2: get_default_route_explicit_routing_lrouter_v32, }, },
}
-@versioned(ROUTER_FUNC_DICT)
+@versioning.versioned(ROUTER_FUNC_DICT)
def create_lrouter(cluster, *args, **kwargs):
if kwargs.get('distributed', None):
v = cluster.api_client.get_version()
return v
-@versioned(ROUTER_FUNC_DICT)
+@versioning.versioned(ROUTER_FUNC_DICT)
def get_default_route_explicit_routing_lrouter(cluster, *args, **kwargs):
pass
-@versioned(ROUTER_FUNC_DICT)
+@versioning.versioned(ROUTER_FUNC_DICT)
def update_lrouter(cluster, *args, **kwargs):
if kwargs.get('routes', None):
v = cluster.api_client.get_version()
return v
-@versioned(ROUTER_FUNC_DICT)
+@versioning.versioned(ROUTER_FUNC_DICT)
def create_lrouter_dnat_rule(cluster, *args, **kwargs):
pass
-@versioned(ROUTER_FUNC_DICT)
+@versioning.versioned(ROUTER_FUNC_DICT)
def create_lrouter_snat_rule(cluster, *args, **kwargs):
pass
-@versioned(ROUTER_FUNC_DICT)
+@versioning.versioned(ROUTER_FUNC_DICT)
def create_lrouter_nosnat_rule(cluster, *args, **kwargs):
pass
-@versioned(ROUTER_FUNC_DICT)
+@versioning.versioned(ROUTER_FUNC_DICT)
def create_lrouter_nodnat_rule(cluster, *args, **kwargs):
pass
from neutron.openstack.common import excutils
from neutron.openstack.common import log
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nsxlib import _build_uri_path
-from neutron.plugins.vmware.nsxlib import do_request
-from neutron.plugins.vmware.nsxlib import format_exception
-from neutron.plugins.vmware.nsxlib import get_all_query_pages
+from neutron.plugins.vmware import nsxlib
HTTP_GET = "GET"
HTTP_POST = "POST"
def query_security_profiles(cluster, fields=None, filters=None):
- return get_all_query_pages(
- _build_uri_path(SECPROF_RESOURCE,
- fields=fields,
- filters=filters),
+ return nsxlib.get_all_query_pages(
+ nsxlib._build_uri_path(SECPROF_RESOURCE,
+ fields=fields,
+ filters=filters),
cluster)
hidden_rules['logical_port_ingress_rules']),
logical_port_egress_rules=hidden_rules['logical_port_egress_rules']
)
- rsp = do_request(HTTP_POST, path, body, cluster=cluster)
+ rsp = nsxlib.do_request(HTTP_POST, path, body, cluster=cluster)
if security_profile.get('name') == 'default':
# If security group is default allow ip traffic between
# members of the same security profile is allowed and ingress traffic
body = mk_body(
logical_port_ingress_rules=rules['logical_port_ingress_rules'],
logical_port_egress_rules=rules['logical_port_egress_rules'])
- rsp = do_request(HTTP_PUT, path, body, cluster=cluster)
+ rsp = nsxlib.do_request(HTTP_PUT, path, body, cluster=cluster)
except exceptions.NotFound as e:
- LOG.error(format_exception("Unknown", e, locals()))
+ LOG.error(nsxlib.format_exception("Unknown", e, locals()))
#FIXME(salvatore-orlando): This should not raise NeutronException
raise exceptions.NeutronException()
LOG.debug(_("Updated Security Profile: %s"), rsp)
def update_security_profile(cluster, spid, name):
- return do_request(HTTP_PUT,
- _build_uri_path(SECPROF_RESOURCE, resource_id=spid),
- json.dumps({
- "display_name": utils.check_and_truncate(name)
- }),
- cluster=cluster)
+ return nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(SECPROF_RESOURCE,
+ resource_id=spid),
+ json.dumps({
+ "display_name": utils.check_and_truncate(name)
+ }),
+ cluster=cluster)
def delete_security_profile(cluster, spid):
path = "/ws.v1/security-profile/%s" % spid
try:
- do_request(HTTP_DELETE, path, cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE, path, cluster=cluster)
except exceptions.NotFound:
with excutils.save_and_reraise_exception():
# This is not necessarily an error condition
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
-from neutron.plugins.vmware.nsxlib import _build_uri_path
-from neutron.plugins.vmware.nsxlib import do_request
-from neutron.plugins.vmware.nsxlib import get_all_query_pages
+from neutron.plugins.vmware import nsxlib
HTTP_GET = "GET"
HTTP_POST = "POST"
def get_lswitch_by_id(cluster, lswitch_id):
try:
- lswitch_uri_path = _build_uri_path(
+ lswitch_uri_path = nsxlib._build_uri_path(
LSWITCH_RESOURCE, lswitch_id,
relations="LogicalSwitchStatus")
- return do_request(HTTP_GET, lswitch_uri_path, cluster=cluster)
+ return nsxlib.do_request(HTTP_GET, lswitch_uri_path, cluster=cluster)
except exception.NotFound:
# FIXME(salv-orlando): this should not raise a neutron exception
raise exception.NetworkNotFound(net_id=lswitch_id)
def lookup_switches_by_tag():
# Fetch extra logical switches
- lswitch_query_path = _build_uri_path(
+ lswitch_query_path = nsxlib._build_uri_path(
LSWITCH_RESOURCE,
fields="uuid,display_name,tags,lport_count",
relations="LogicalSwitchStatus",
filters={'tag': neutron_net_id,
'tag_scope': 'quantum_net_id'})
- return get_all_query_pages(lswitch_query_path, cluster)
+ return nsxlib.get_all_query_pages(lswitch_query_path, cluster)
- lswitch_uri_path = _build_uri_path(LSWITCH_RESOURCE, neutron_net_id,
- relations="LogicalSwitchStatus")
+ lswitch_uri_path = nsxlib._build_uri_path(LSWITCH_RESOURCE, neutron_net_id,
+ relations="LogicalSwitchStatus")
results = []
try:
- ls = do_request(HTTP_GET, lswitch_uri_path, cluster=cluster)
+ ls = nsxlib.do_request(HTTP_GET, lswitch_uri_path, cluster=cluster)
results.append(ls)
for tag in ls['tags']:
if (tag['scope'] == "multi_lswitch" and
"scope": "shared"})
if "tags" in kwargs:
lswitch_obj["tags"].extend(kwargs["tags"])
- uri = _build_uri_path(LSWITCH_RESOURCE)
- lswitch = do_request(HTTP_POST, uri, json.dumps(lswitch_obj),
- cluster=cluster)
+ uri = nsxlib._build_uri_path(LSWITCH_RESOURCE)
+ lswitch = nsxlib.do_request(HTTP_POST, uri, json.dumps(lswitch_obj),
+ cluster=cluster)
LOG.debug(_("Created logical switch: %s"), lswitch['uuid'])
return lswitch
def update_lswitch(cluster, lswitch_id, display_name,
tenant_id=None, **kwargs):
- uri = _build_uri_path(LSWITCH_RESOURCE, resource_id=lswitch_id)
+ uri = nsxlib._build_uri_path(LSWITCH_RESOURCE, resource_id=lswitch_id)
lswitch_obj = {"display_name": utils.check_and_truncate(display_name),
"tags": utils.get_tags(os_tid=tenant_id)}
if "tags" in kwargs:
lswitch_obj["tags"].extend(kwargs["tags"])
try:
- return do_request(HTTP_PUT, uri, json.dumps(lswitch_obj),
- cluster=cluster)
+ return nsxlib.do_request(HTTP_PUT, uri, json.dumps(lswitch_obj),
+ cluster=cluster)
except exception.NotFound as e:
LOG.error(_("Network not found, Error: %s"), str(e))
raise exception.NetworkNotFound(net_id=lswitch_id)
for ls_id in lswitch_ids:
path = "/ws.v1/lswitch/%s" % ls_id
try:
- do_request(HTTP_DELETE, path, cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE, path, cluster=cluster)
except exception.NotFound as e:
LOG.error(_("Network not found, Error: %s"), str(e))
raise exception.NetworkNotFound(net_id=ls_id)
if filters and "attachment" in filters:
filters['attachment_vif_uuid'] = filters["attachment"]
del filters['attachment']
- uri = _build_uri_path(LSWITCHPORT_RESOURCE, parent_resource_id=ls_uuid,
- fields=fields, filters=filters, relations=relations)
- return do_request(HTTP_GET, uri, cluster=cluster)['results']
+ uri = nsxlib._build_uri_path(LSWITCHPORT_RESOURCE,
+ parent_resource_id=ls_uuid,
+ fields=fields,
+ filters=filters,
+ relations=relations)
+ return nsxlib.do_request(HTTP_GET, uri, cluster=cluster)['results']
def delete_port(cluster, switch, port):
uri = "/ws.v1/lswitch/" + switch + "/lport/" + port
try:
- do_request(HTTP_DELETE, uri, cluster=cluster)
+ nsxlib.do_request(HTTP_DELETE, uri, cluster=cluster)
except exception.NotFound:
LOG.exception(_("Port or Network not found"))
raise exception.PortNotFoundOnNetwork(
# call. In release L-** or M-**, we might want to swap the calls
# as it's likely that ports with the new tag would outnumber the
# ones with the old tag
- ports = get_all_query_pages(lport_query_path_obsolete, cluster)
+ ports = nsxlib.get_all_query_pages(lport_query_path_obsolete,
+ cluster)
if not ports:
- ports = get_all_query_pages(lport_query_path, cluster)
+ ports = nsxlib.get_all_query_pages(lport_query_path, cluster)
except exception.NotFound:
LOG.warn(_("Lswitch %s not found in NSX"), lswitch)
ports = None
Returns the NSX UUID of the logical port with tag q_port_id equal to
neutron_port_id or None if the port is not Found.
"""
- uri = _build_uri_path(LSWITCHPORT_RESOURCE,
- parent_resource_id=lswitch_uuid,
- fields='uuid',
- filters={'tag': neutron_port_id,
- 'tag_scope': 'q_port_id'})
+ uri = nsxlib._build_uri_path(LSWITCHPORT_RESOURCE,
+ parent_resource_id=lswitch_uuid,
+ fields='uuid',
+ filters={'tag': neutron_port_id,
+ 'tag_scope': 'q_port_id'})
LOG.debug(_("Looking for port with q_port_id tag '%(neutron_port_id)s' "
"on: '%(lswitch_uuid)s'"),
{'neutron_port_id': neutron_port_id,
'lswitch_uuid': lswitch_uuid})
- res = do_request(HTTP_GET, uri, cluster=cluster)
+ res = nsxlib.do_request(HTTP_GET, uri, cluster=cluster)
num_results = len(res["results"])
if num_results >= 1:
if num_results > 1:
if relations:
uri += "relations=%s" % relations
try:
- return do_request(HTTP_GET, uri, cluster=cluster)
+ return nsxlib.do_request(HTTP_GET, uri, cluster=cluster)
except exception.NotFound as e:
LOG.error(_("Port or Network not found, Error: %s"), str(e))
raise exception.PortNotFoundOnNetwork(
path = "/ws.v1/lswitch/" + lswitch_uuid + "/lport/" + lport_uuid
try:
- result = do_request(HTTP_PUT, path, json.dumps(lport_obj),
- cluster=cluster)
+ result = nsxlib.do_request(HTTP_PUT, path, json.dumps(lport_obj),
+ cluster=cluster)
LOG.debug(_("Updated logical port %(result)s "
"on logical switch %(uuid)s"),
{'result': result['uuid'], 'uuid': lswitch_uuid})
queue_id, mac_learning_enabled,
allowed_address_pairs)
- path = _build_uri_path(LSWITCHPORT_RESOURCE,
- parent_resource_id=lswitch_uuid)
- result = do_request(HTTP_POST, path, json.dumps(lport_obj),
- cluster=cluster)
+ path = nsxlib._build_uri_path(LSWITCHPORT_RESOURCE,
+ parent_resource_id=lswitch_uuid)
+ result = nsxlib.do_request(HTTP_POST, path, json.dumps(lport_obj),
+ cluster=cluster)
LOG.debug(_("Created logical port %(result)s on logical switch %(uuid)s"),
{'result': result['uuid'], 'uuid': lswitch_uuid})
def get_port_status(cluster, lswitch_id, port_id):
"""Retrieve the operational status of the port."""
try:
- r = do_request(HTTP_GET,
- "/ws.v1/lswitch/%s/lport/%s/status" %
- (lswitch_id, port_id), cluster=cluster)
+ r = nsxlib.do_request(HTTP_GET,
+ "/ws.v1/lswitch/%s/lport/%s/status" %
+ (lswitch_id, port_id), cluster=cluster)
except exception.NotFound as e:
LOG.error(_("Port not found, Error: %s"), str(e))
raise exception.PortNotFoundOnNetwork(
def plug_interface(cluster, lswitch_id, lport_id, att_obj):
- return do_request(HTTP_PUT,
- _build_uri_path(LSWITCHPORT_RESOURCE,
- lport_id, lswitch_id,
- is_attachment=True),
- json.dumps(att_obj),
- cluster=cluster)
+ return nsxlib.do_request(HTTP_PUT,
+ nsxlib._build_uri_path(LSWITCHPORT_RESOURCE,
+ lport_id, lswitch_id,
+ is_attachment=True),
+ json.dumps(att_obj),
+ cluster=cluster)
def plug_vif_interface(
from neutron.plugins.vmware.common import nsx_utils
from neutron.plugins.vmware.common import securitygroups as sg_utils
from neutron.plugins.vmware.common import sync
-from neutron.plugins.vmware.common.utils import NetworkTypes
+from neutron.plugins.vmware.common import utils as c_utils
from neutron.plugins.vmware.dbexts import db as nsx_db
from neutron.plugins.vmware.dbexts import distributedrouter as dist_rtr
from neutron.plugins.vmware.dbexts import maclearning as mac_db
max_ports = self.nsx_opts.max_lp_per_overlay_ls
allow_extra_lswitches = False
for network_binding in network_bindings:
- if network_binding.binding_type in (NetworkTypes.FLAT,
- NetworkTypes.VLAN):
+ if network_binding.binding_type in (c_utils.NetworkTypes.FLAT,
+ c_utils.NetworkTypes.VLAN):
max_ports = self.nsx_opts.max_lp_per_bridged_ls
allow_extra_lswitches = True
break
True,
ip_addresses)
ext_network = self.get_network(context, port_data['network_id'])
- if ext_network.get(pnet.NETWORK_TYPE) == NetworkTypes.L3_EXT:
+ if ext_network.get(pnet.NETWORK_TYPE) == c_utils.NetworkTypes.L3_EXT:
# Update attachment
physical_network = (ext_network[pnet.PHYSICAL_NETWORK] or
self.cluster.default_l3_gw_service_uuid)
err_msg = None
if not network_type_set:
err_msg = _("%s required") % pnet.NETWORK_TYPE
- elif network_type in (NetworkTypes.GRE, NetworkTypes.STT,
- NetworkTypes.FLAT):
+ elif network_type in (c_utils.NetworkTypes.GRE,
+ c_utils.NetworkTypes.STT,
+ c_utils.NetworkTypes.FLAT):
if segmentation_id_set:
err_msg = _("Segmentation ID cannot be specified with "
"flat network type")
- elif network_type == NetworkTypes.VLAN:
+ elif network_type == c_utils.NetworkTypes.VLAN:
if not segmentation_id_set:
err_msg = _("Segmentation ID must be specified with "
"vlan network type")
raise n_exc.VlanIdInUse(
vlan_id=segmentation_id,
physical_network=physical_network)
- elif network_type == NetworkTypes.L3_EXT:
+ elif network_type == c_utils.NetworkTypes.L3_EXT:
if (segmentation_id_set and
not utils.is_valid_vlan_tag(segmentation_id)):
err_msg = (_("%(segmentation_id)s out of range "
if bindings:
transport_entry = {}
for binding in bindings:
- if binding.binding_type in [NetworkTypes.FLAT,
- NetworkTypes.VLAN]:
- transport_entry['transport_type'] = NetworkTypes.BRIDGE
+ if binding.binding_type in [c_utils.NetworkTypes.FLAT,
+ c_utils.NetworkTypes.VLAN]:
+ transport_entry['transport_type'] = (
+ c_utils.NetworkTypes.BRIDGE)
transport_entry['binding_config'] = {}
vlan_id = binding.vlan_id
if vlan_id:
transport_entry = {}
transport_type = transport_zone.get(pnet.NETWORK_TYPE)
- if transport_type in [NetworkTypes.FLAT, NetworkTypes.VLAN]:
- transport_entry['transport_type'] = NetworkTypes.BRIDGE
+ if transport_type in [c_utils.NetworkTypes.FLAT,
+ c_utils.NetworkTypes.VLAN]:
+ transport_entry['transport_type'] = c_utils.NetworkTypes.BRIDGE
transport_entry['binding_config'] = {}
vlan_id = transport_zone.get(pnet.SEGMENTATION_ID)
if vlan_id:
from neutron.plugins.vmware.nsxlib import switch as switchlib
from neutron.plugins.vmware.plugins import base
from neutron.plugins.vmware.vshield.common import constants as vcns_const
-from neutron.plugins.vmware.vshield.common.constants import RouterStatus
from neutron.plugins.vmware.vshield.common import exceptions
-from neutron.plugins.vmware.vshield.tasks.constants import TaskState
-from neutron.plugins.vmware.vshield.tasks.constants import TaskStatus
+from neutron.plugins.vmware.vshield.tasks import constants as tasks_const
from neutron.plugins.vmware.vshield import vcns_driver
from sqlalchemy.orm import exc as sa_exc
]
ROUTER_STATUS_LEVEL = {
- service_constants.ACTIVE: RouterStatus.ROUTER_STATUS_ACTIVE,
- service_constants.DOWN: RouterStatus.ROUTER_STATUS_DOWN,
+ service_constants.ACTIVE: vcns_const.RouterStatus.ROUTER_STATUS_ACTIVE,
+ service_constants.DOWN: vcns_const.RouterStatus.ROUTER_STATUS_DOWN,
service_constants.PENDING_CREATE: (
- RouterStatus.ROUTER_STATUS_PENDING_CREATE
+ vcns_const.RouterStatus.ROUTER_STATUS_PENDING_CREATE
),
service_constants.PENDING_DELETE: (
- RouterStatus.ROUTER_STATUS_PENDING_DELETE
+ vcns_const.RouterStatus.ROUTER_STATUS_PENDING_DELETE
),
- service_constants.ERROR: RouterStatus.ROUTER_STATUS_ERROR
+ service_constants.ERROR: vcns_const.RouterStatus.ROUTER_STATUS_ERROR
}
self.vcns_driver.external_network,
addr, mask, secondary=secondary)
if sync:
- task.wait(TaskState.RESULT)
+ task.wait(tasks_const.TaskState.RESULT)
def _update_router_gw_info(self, context, router_id, info):
if not self._is_advanced_service_router(context, router_id):
lrouter = routerlib.get_lrouter(self.cluster, id)
lr_status = lrouter["_relations"]["LogicalRouterStatus"]
if lr_status["fabric_status"]:
- nsx_status = RouterStatus.ROUTER_STATUS_ACTIVE
+ nsx_status = vcns_const.RouterStatus.ROUTER_STATUS_ACTIVE
else:
- nsx_status = RouterStatus.ROUTER_STATUS_DOWN
+ nsx_status = vcns_const.RouterStatus.ROUTER_STATUS_DOWN
except n_exc.NotFound:
- nsx_status = RouterStatus.ROUTER_STATUS_ERROR
+ nsx_status = vcns_const.RouterStatus.ROUTER_STATUS_ERROR
return nsx_status
if (nsx_lrouter["_relations"]["LogicalRouterStatus"]
["fabric_status"]):
nsx_status[nsx_lrouter['uuid']] = (
- RouterStatus.ROUTER_STATUS_ACTIVE
+ vcns_const.RouterStatus.ROUTER_STATUS_ACTIVE
)
else:
nsx_status[nsx_lrouter['uuid']] = (
- RouterStatus.ROUTER_STATUS_DOWN
+ vcns_const.RouterStatus.ROUTER_STATUS_DOWN
)
return nsx_status
if router_type == ROUTER_TYPE_ADVANCED:
vse_status_level = vse_status_all.get(router['id'])
if vse_status_level is None:
- vse_status_level = RouterStatus.ROUTER_STATUS_ERROR
+ vse_status_level = (
+ vcns_const.RouterStatus.ROUTER_STATUS_ERROR)
if vse_status_level > ROUTER_STATUS_LEVEL[router['status']]:
router['status'] = ROUTER_STATUS[vse_status_level]
# Router might have been deleted before deploy finished
LOG.exception(_("Router %s not found"), lrouter['uuid'])
- if task.status == TaskStatus.COMPLETED:
+ if task.status == tasks_const.TaskStatus.COMPLETED:
LOG.debug(_("Successfully deployed %(edge_id)s for "
"router %(name)s"), {
'edge_id': task.userdata['edge_id'],
jobdata = task.userdata['jobdata']
router_id = task.userdata['router_id']
context = jobdata['context']
- if task.status == TaskStatus.COMPLETED:
+ if task.status == tasks_const.TaskStatus.COMPLETED:
vcns_db.delete_vcns_router_binding(context.session,
router_id)
# under the License.
#
-from neutronclient.neutron.v2_0 import find_resourceid_by_name_or_id
-from neutronclient.neutron.v2_0 import NeutronCommand
+from neutronclient.neutron import v2_0 as client
LSN_PATH = '/lsns'
write_func(_("Port uuids = %s\n\n") % ports)
-class NetworkReport(NeutronCommand):
+class NetworkReport(client.NeutronCommand):
"""Retrieve network migration report."""
def get_parser(self, prog_name):
def run(self, parsed_args):
net = parsed_args.network
- net_id = find_resourceid_by_name_or_id(self.app.client, 'network', net)
+ net_id = client.find_resourceid_by_name_or_id(self.app.client,
+ 'network', net)
res = self.app.client.get("%s/%s" % (LSN_PATH, net_id))
if res:
self.app.stdout.write(_('Migration report is:\n'))
print_report(self.app.stdout.write, res['lsn'])
-class NetworkMigrate(NeutronCommand):
+class NetworkMigrate(client.NeutronCommand):
"""Perform network migration."""
def get_parser(self, prog_name):
def run(self, parsed_args):
net = parsed_args.network
- net_id = find_resourceid_by_name_or_id(self.app.client, 'network', net)
+ net_id = client.find_resourceid_by_name_or_id(self.app.client,
+ 'network', net)
body = {'network': net_id}
res = self.app.client.post(LSN_PATH, body={'lsn': body})
if res:
from neutron.plugins.vmware.common import utils
from neutron.plugins.vmware.vshield.common import (
constants as vcns_const)
-from neutron.plugins.vmware.vshield.common.constants import RouterStatus
+from neutron.plugins.vmware.vshield.common import constants as common_constants
from neutron.plugins.vmware.vshield.common import exceptions
-from neutron.plugins.vmware.vshield.tasks.constants import TaskState
-from neutron.plugins.vmware.vshield.tasks.constants import TaskStatus
+from neutron.plugins.vmware.vshield.tasks import constants
from neutron.plugins.vmware.vshield.tasks import tasks
LOG = logging.getLogger(__name__)
def _edge_status_to_level(self, status):
if status == 'GREEN':
- status_level = RouterStatus.ROUTER_STATUS_ACTIVE
+ status_level = common_constants.RouterStatus.ROUTER_STATUS_ACTIVE
elif status in ('GREY', 'YELLOW'):
- status_level = RouterStatus.ROUTER_STATUS_DOWN
+ status_level = common_constants.RouterStatus.ROUTER_STATUS_DOWN
else:
- status_level = RouterStatus.ROUTER_STATUS_ERROR
+ status_level = common_constants.RouterStatus.ROUTER_STATUS_ERROR
return status_level
def _enable_loadbalancer(self, edge):
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to get edge status:\n%s"),
e.response)
- status_level = RouterStatus.ROUTER_STATUS_ERROR
+ status_level = common_constants.RouterStatus.ROUTER_STATUS_ERROR
try:
desc = jsonutils.loads(e.response)
if desc.get('errorCode') == (
vcns_const.VCNS_ERROR_CODE_EDGE_NOT_RUNNING):
- status_level = RouterStatus.ROUTER_STATUS_DOWN
+ status_level = (
+ common_constants.RouterStatus.ROUTER_STATUS_DOWN)
except ValueError:
LOG.exception(e.response)
LOG.exception(_("VCNS: Failed to update vnic %d"),
config['index'])
- return TaskStatus.COMPLETED
+ return constants.TaskStatus.COMPLETED
def update_interface(self, router_id, edge_id, index, network,
address=None, netmask=None, secondary=None,
edge_id = response['edgeId']
LOG.debug(_("VCNS: deploying edge %s"), edge_id)
userdata['edge_id'] = edge_id
- status = TaskStatus.PENDING
+ status = constants.TaskStatus.PENDING
except exceptions.VcnsApiException:
with excutils.save_and_reraise_exception():
LOG.exception(_("VCNS: deploy edge failed for router %s."),
task.userdata['retries'] = 0
system_status = response.get('systemStatus', None)
if system_status is None:
- status = TaskStatus.PENDING
+ status = constants.TaskStatus.PENDING
elif system_status == 'good':
- status = TaskStatus.COMPLETED
+ status = constants.TaskStatus.COMPLETED
else:
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
except exceptions.VcnsApiException:
with excutils.save_and_reraise_exception():
LOG.exception(_("VCNS: Edge %s status query failed."), edge_id)
'edge_id': edge_id,
'retries': retries}
LOG.exception(msg)
- status = TaskStatus.PENDING
+ status = constants.TaskStatus.PENDING
else:
msg = _("VCNS: Unable to retrieve edge %s status. "
"Abort.") % edge_id
LOG.exception(msg)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
LOG.debug(_("VCNS: Edge %s status"), edge_id)
return status
def _result_edge(self, task):
router_name = task.userdata['router_name']
edge_id = task.userdata.get('edge_id')
- if task.status != TaskStatus.COMPLETED:
+ if task.status != constants.TaskStatus.COMPLETED:
LOG.error(_("VCNS: Failed to deploy edge %(edge_id)s "
"for %(name)s, status %(status)d"), {
'edge_id': edge_id,
def _delete_edge(self, task):
edge_id = task.userdata['edge_id']
LOG.debug(_("VCNS: start destroying edge %s"), edge_id)
- status = TaskStatus.COMPLETED
+ status = constants.TaskStatus.COMPLETED
if edge_id:
try:
self.vcns.delete_edge(edge_id)
"%(response)s") % {
'edge_id': edge_id, 'response': e.response}
LOG.exception(msg)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
except Exception:
LOG.exception(_("VCNS: Failed to delete %s"), edge_id)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
return status
if wait_for_exec:
# wait until the deploy task is executed so edge_id is available
- task.wait(TaskState.EXECUTED)
+ task.wait(constants.TaskState.EXECUTED)
return task
try:
self.vcns.update_nat_config(edge_id, nat)
- status = TaskStatus.COMPLETED
+ status = constants.TaskStatus.COMPLETED
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
e.response)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
return status
'type': addrtype, 'addr': address})
nat = self.get_nat_config(edge_id)
del nat['version']
- status = TaskStatus.COMPLETED
+ status = constants.TaskStatus.COMPLETED
for nat_rule in nat['rules']['natRulesDtos']:
if nat_rule[addrtype] == address:
rule_id = nat_rule['ruleId']
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to delete snat rule:\n"
"%s"), e.response)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
return status
if task != self.updated_task['nat'][edge_id]:
# this task does not have the latest config, abort now
# for speedup
- return TaskStatus.ABORT
+ return constants.TaskStatus.ABORT
rules = task.userdata['rules']
LOG.debug(_("VCNS: start updating nat rules: %s"), rules)
try:
self.vcns.update_nat_config(edge_id, nat)
- status = TaskStatus.COMPLETED
+ status = constants.TaskStatus.COMPLETED
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
e.response)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
return status
task.userdata.get('skippable', True)):
# this task does not have the latest config, abort now
# for speedup
- return TaskStatus.ABORT
+ return constants.TaskStatus.ABORT
gateway = task.userdata['gateway']
routes = task.userdata['routes']
LOG.debug(_("VCNS: start updating routes for %s"), edge_id)
}
try:
self.vcns.update_routes(edge_id, request)
- status = TaskStatus.COMPLETED
+ status = constants.TaskStatus.COMPLETED
except exceptions.VcnsApiException as e:
LOG.exception(_("VCNS: Failed to update routes:\n%s"),
e.response)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
return status
from neutron.common import exceptions
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.plugins.vmware.vshield.tasks.constants import TaskState
-from neutron.plugins.vmware.vshield.tasks.constants import TaskStatus
+from neutron.plugins.vmware.vshield.tasks import constants
DEFAULT_INTERVAL = 1000
def nop(task):
- return TaskStatus.COMPLETED
+ return constants.TaskStatus.COMPLETED
class TaskException(exceptions.NeutronException):
self.status = None
self._monitors = {
- TaskState.START: [],
- TaskState.EXECUTED: [],
- TaskState.RESULT: []
+ constants.TaskState.START: [],
+ constants.TaskState.EXECUTED: [],
+ constants.TaskState.RESULT: []
}
self._states = [None, None, None, None]
- self._state = TaskState.NONE
+ self._state = constants.TaskState.NONE
def _add_monitor(self, action, func):
self._monitors[action].append(func)
return self
def _start(self):
- return self._invoke_monitor(TaskState.START)
+ return self._invoke_monitor(constants.TaskState.START)
def _executed(self):
- return self._invoke_monitor(TaskState.EXECUTED)
+ return self._invoke_monitor(constants.TaskState.EXECUTED)
def _update_status(self, status):
if self.status == status:
self.status = status
def _finished(self):
- return self._invoke_monitor(TaskState.RESULT)
+ return self._invoke_monitor(constants.TaskState.RESULT)
def add_start_monitor(self, func):
- return self._add_monitor(TaskState.START, func)
+ return self._add_monitor(constants.TaskState.START, func)
def add_executed_monitor(self, func):
- return self._add_monitor(TaskState.EXECUTED, func)
+ return self._add_monitor(constants.TaskState.EXECUTED, func)
def add_result_monitor(self, func):
- return self._add_monitor(TaskState.RESULT, func)
+ return self._add_monitor(constants.TaskState.RESULT, func)
def wait(self, state):
- if (state < TaskState.START or
- state > TaskState.RESULT or
- state == TaskState.STATUS):
+ if (state < constants.TaskState.START or
+ state > constants.TaskState.RESULT or
+ state == constants.TaskState.STATUS):
raise InvalidState(state=state)
if state <= self._state:
'task': str(task),
'cb': str(task._execute_callback)}
LOG.exception(msg)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
LOG.debug(_("Task %(task)s return %(status)s"), {
'task': str(task),
'task': str(task),
'cb': str(task._status_callback)}
LOG.exception(msg)
- status = TaskStatus.ERROR
+ status = constants.TaskStatus.ERROR
task._update_status(status)
- if status != TaskStatus.PENDING:
+ if status != constants.TaskStatus.PENDING:
self._dequeue(task, True)
def _enqueue(self, task):
while tasks:
task = tasks[0]
status = self._execute(task)
- if status == TaskStatus.PENDING:
+ if status == constants.TaskStatus.PENDING:
break
self._dequeue(task, False)
for resource_id in self._tasks.keys():
tasks = list(self._tasks[resource_id])
for task in tasks:
- task._update_status(TaskStatus.ABORT)
+ task._update_status(constants.TaskStatus.ABORT)
self._dequeue(task, False)
def _get_task(self):
# The thread is killed during _execute(). To guarantee
# the task been aborted correctly, put it to the queue.
self._enqueue(task)
- elif task.status != TaskStatus.PENDING:
+ elif task.status != constants.TaskStatus.PENDING:
self._result(task)
else:
self._enqueue(task)
side_effect=[None, {'opts': 'opts_val'}]),
mock.patch(self._AGENT_NAME + '.OVSBridge.get_ofport',
return_value=1),
- mock.patch(self._AGENT_NAME + '.VifPort')
+ mock.patch('neutron.agent.linux.ovs_lib.VifPort')
) as (mock_db, mock_ofport, mock_vif):
br = self.mod_agent.OVSBridge('br_name', 'helper')
vifport = br._get_external_port('iface')
{'opts': 'opts_val'}]),
mock.patch(self._AGENT_NAME + '.OVSBridge.get_ofport',
return_value=1),
- mock.patch(self._AGENT_NAME + '.VifPort')
+ mock.patch('neutron.agent.linux.ovs_lib.VifPort')
) as (mock_db, mock_ofport, mock_vif):
br = self.mod_agent.OVSBridge('br_name', 'helper')
vifport = br._get_external_port('iface')
side_effect=[None, {'remote_ip': '0.0.0.0'}]),
mock.patch(self._AGENT_NAME + '.OVSBridge.get_ofport',
return_value=1),
- mock.patch(self._AGENT_NAME + '.VifPort')
+ mock.patch('neutron.agent.linux.ovs_lib.VifPort')
) as (mock_db, mock_ofport, mock_vif):
br = self.mod_agent.OVSBridge('br_name', 'helper')
vifport = br._get_external_port('iface')
import mock
from neutron.common import exceptions
-from neutron.services.loadbalancer.drivers.haproxy import (
- namespace_driver
-)
+from neutron.services.loadbalancer.drivers.haproxy import namespace_driver
from neutron.tests import base
def test_create_trusted_qos_queue(self):
with mock.patch.object(qos_db.LOG, 'info') as log:
- with mock.patch.object(nsxlib.queue, 'do_request',
+ with mock.patch.object(nsxlib, 'do_request',
return_value={"uuid": "fake_queue"}):
with self.qos_queue(name='fake_lqueue', min=34, max=44,
qos_marking='trusted', default=False) as q:
connector_type = 'stt'
connector_ip = '1.1.1.1'
client_certificate = 'this_should_be_a_certificate'
- with mock.patch.object(l2gwlib, 'do_request') as request_mock:
+ with mock.patch.object(nsxlib, 'do_request') as request_mock:
expected_req_body = self._create_expected_req_body(
display_name, neutron_id, connector_type.upper(),
connector_ip, client_certificate)
connector_type = 'stt'
connector_ip = '1.1.1.1'
client_certificate = 'this_should_be_a_certificate'
- with mock.patch.object(l2gwlib, 'do_request') as request_mock:
+ with mock.patch.object(nsxlib, 'do_request') as request_mock:
expected_req_body = self._create_expected_req_body(
display_name, neutron_id, connector_type.upper(),
connector_ip, client_certificate)
neutron_id = 'whatever'
connector_type = 'stt'
connector_ip = '1.1.1.1'
- with mock.patch.object(l2gwlib, 'do_request') as request_mock:
+ with mock.patch.object(nsxlib, 'do_request') as request_mock:
expected_req_body = self._create_expected_req_body(
display_name, neutron_id, connector_type.upper(),
connector_ip, None)
def test_get_gw_device_status(self):
# NOTE(salv-orlando): This unit test mocks backend calls rather than
# leveraging the fake NVP API client
- with mock.patch.object(l2gwlib, 'do_request') as request_mock:
+ with mock.patch.object(nsxlib, 'do_request') as request_mock:
l2gwlib.get_gateway_device_status(self.fake_cluster, 'whatever')
request_mock.assert_called_once_with(
"GET",
def test_delete_gw_device(self):
# NOTE(salv-orlando): This unit test mocks backend calls rather than
# leveraging the fake NVP API client
- with mock.patch.object(l2gwlib, 'do_request') as request_mock:
+ with mock.patch.object(nsxlib, 'do_request') as request_mock:
l2gwlib.delete_gateway_device(self.fake_cluster, 'whatever')
request_mock.assert_called_once_with(
"DELETE",
def setUp(self):
super(LSNTestCase, self).setUp()
- self.mock_request_p = mock.patch.object(lsnlib, 'do_request')
+ self.mock_request_p = mock.patch(
+ 'neutron.plugins.vmware.nsxlib.do_request')
self.mock_request = self.mock_request_p.start()
self.cluster = mock.Mock()
self.cluster.default_service_cluster_uuid = 'foo'
from neutron.common import exceptions
from neutron.plugins.vmware.api_client import exception as api_exc
+from neutron.plugins.vmware import nsxlib
from neutron.plugins.vmware.nsxlib import queue as queuelib
from neutron.tests.unit.vmware.nsxlib import base
def test_create_and_get_lqueue(self):
queue_id = queuelib.create_lqueue(
self.fake_cluster, self.fake_queue)
- queue_res = queuelib.do_request(
+ queue_res = nsxlib.do_request(
'GET',
- queuelib._build_uri_path('lqueue', resource_id=queue_id),
+ nsxlib._build_uri_path('lqueue', resource_id=queue_id),
cluster=self.fake_cluster)
self.assertEqual(queue_id, queue_res['uuid'])
self.assertEqual('fake_queue', queue_res['display_name'])
def raise_nsx_exc(*args, **kwargs):
raise api_exc.NsxApiException()
- with mock.patch.object(queuelib, 'do_request', new=raise_nsx_exc):
+ with mock.patch.object(nsxlib, 'do_request', new=raise_nsx_exc):
self.assertRaises(
exceptions.NeutronException, queuelib.create_lqueue,
self.fake_cluster, self.fake_queue)
self.fake_cluster, self.fake_queue)
queuelib.delete_lqueue(self.fake_cluster, queue_id)
self.assertRaises(exceptions.NotFound,
- queuelib.do_request,
+ nsxlib.do_request,
'GET',
- queuelib._build_uri_path(
+ nsxlib._build_uri_path(
'lqueue', resource_id=queue_id),
cluster=self.fake_cluster)
'type': 'LogicalRouterStatus',
'lport_link_up_count': 0, }, }
- with mock.patch.object(routerlib, 'do_request',
+ with mock.patch.object(nsxlib, 'do_request',
return_value=self._get_lrouter(tenant_id,
router_name,
router_id,
router_id = 'fake_router_id'
nexthop_ip = '10.0.0.1'
with mock.patch.object(
- routerlib, 'do_request',
+ nsxlib, 'do_request',
return_value=self._get_lrouter(tenant_id,
router_name,
router_id)):
return {'_relations': {'LogicalPortAttachment':
{'peer_port_uuid': lrouter_port['uuid']}}}
# mock get_port
- with mock.patch.object(routerlib, 'get_port', new=fakegetport):
+ with mock.patch.object(switchlib, 'get_port', new=fakegetport):
routerlib.delete_peer_router_lport(self.fake_cluster,
lrouter_port['uuid'],
'whatwever', 'whatever')
def raise_nsx_exc(*args, **kwargs):
raise api_exc.NsxApiException()
- with mock.patch.object(routerlib, 'do_request', new=raise_nsx_exc):
+ with mock.patch.object(nsxlib, 'do_request', new=raise_nsx_exc):
self.assertRaises(
nsx_exc.NsxPluginException, routerlib.update_lrouter_port_ips,
self.fake_cluster, lrouter['uuid'],
from neutron.plugins.vmware.api_client import version as version_module
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import sync
+from neutron.plugins.vmware.common import utils
from neutron.plugins.vmware.dbexts import db as nsx_db
from neutron.plugins.vmware.extensions import distributedrouter as dist_router
from neutron.plugins.vmware import nsxlib
-from neutron.plugins.vmware.plugins import base
from neutron.tests.unit import _test_extension_portbindings as test_bindings
import neutron.tests.unit.test_db_plugin as test_plugin
import neutron.tests.unit.test_extension_ext_gw_mode as test_ext_gw_mode
def test_create_port_maintenance_returns_503(self):
with self.network() as net:
- with mock.patch.object(nsxlib.switch, 'do_request',
+ with mock.patch.object(nsxlib, 'do_request',
side_effect=nsx_exc.MaintenanceInProgress):
data = {'port': {'network_id': net['network']['id'],
'admin_state_up': False,
data = {'network': {'name': 'foo',
'admin_state_up': True,
'tenant_id': self._tenant_id}}
- with mock.patch.object(nsxlib.switch, 'do_request',
+ with mock.patch.object(nsxlib, 'do_request',
side_effect=nsx_exc.MaintenanceInProgress):
net_req = self.new_create_request('networks', data, self.fmt)
res = net_req.get_response(self.api)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
- def test_update_security_group_deal_with_exc(self):
- name = 'foo security group'
- with mock.patch.object(nsxlib.switch, 'do_request',
- side_effect=api_exc.NsxApiException):
- with self.security_group(name=name) as sg:
- self.assertEqual(sg['security_group']['name'], name)
-
class TestL3ExtensionManager(object):
def _create_l3_ext_network(self, vlan_id=None):
name = 'l3_ext_net'
- net_type = base.NetworkTypes.L3_EXT
+ net_type = utils.NetworkTypes.L3_EXT
providernet_args = {pnet.NETWORK_TYPE: net_type,
pnet.PHYSICAL_NETWORK: 'l3_gw_uuid'}
if vlan_id:
def _test_create_l3_ext_network(self, vlan_id=None):
name = 'l3_ext_net'
- net_type = base.NetworkTypes.L3_EXT
+ net_type = utils.NetworkTypes.L3_EXT
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(external_net.EXTERNAL, True),
with self._create_l3_ext_network() as net:
with self.subnet(network=net) as s:
with mock.patch.object(
- nsxlib.router,
+ nsxlib,
'do_request',
side_effect=nsx_exc.MaintenanceInProgress):
data = {'router': {'tenant_id': 'whatever'}}