Implement API/driver interface for configuring vShield Edge Appliance.
Currently implemented functions:
- Deploy an Edge
- Destroy an Edge
- Configuring interfaces
- Configuring SNAT/DNAT rules
- Configuring default gateway and static routes
- Query Edge status
- Task-based asynchronous model
- Allow old routes/nat config to be skipped if new updates are coming
Implements: blueprint vcns-driver
Change-Id: I881bde907f4c90de4c919d008b76b8c2a2d0e1fd
# that using the minimum chunk size will cause the interval between two
# requests to be less than min_sync_req_delay
# min_chunk_size = 500
+
+[vcns]
+# URL for VCNS manager
+# manager_uri = https://management_ip
+
+# User name for VCNS manager
+# user = admin
+
+# Password for VCNS manager
+# password = default
+
+# (Optional) Datacenter ID for Edge deployment
+# datacenter_moid =
+
+# (Optional) Deployment Container ID for NSX Edge deployment
+# If not specified, either a default global container will be used, or
+# the resource pool and datastore specified below will be used
+# deployment_container_id =
+
+# (Optional) Resource pool ID for NSX Edge deployment
+# resource_pool_id =
+
+# (Optional) Datastore ID for NSX Edge deployment
+# datastore_id =
+
+# (Required) UUID of logic switch for physical network connectivity
+# external_network =
+
+# (Optional) Asynchronous task status check interval
+# default is 2000 (millisecond)
+# task_status_check_interval = 2000
"network connection")),
]
+DEFAULT_STATUS_CHECK_INTERVAL = 2000
+
+vcns_opts = [
+ cfg.StrOpt('user',
+ default='admin',
+ help=_('User name for vsm')),
+ cfg.StrOpt('password',
+ default='default',
+ secret=True,
+ help=_('Password for vsm')),
+ cfg.StrOpt('manager_uri',
+ help=_('uri for vsm')),
+ cfg.StrOpt('datacenter_moid',
+ help=_('Optional parameter identifying the ID of datacenter '
+ 'to deploy NSX Edges')),
+ cfg.StrOpt('deployment_container_id',
+ help=_('Optional parameter identifying the ID of datastore to '
+ 'deploy NSX Edges')),
+ cfg.StrOpt('resource_pool_id',
+ help=_('Optional parameter identifying the ID of resource to '
+ 'deploy NSX Edges')),
+ cfg.StrOpt('datastore_id',
+ help=_('Optional parameter identifying the ID of datastore to '
+ 'deploy NSX Edges')),
+ cfg.StrOpt('external_network',
+ help=_('Network ID for physical network connectivity')),
+ cfg.IntOpt('task_status_check_interval',
+ default=DEFAULT_STATUS_CHECK_INTERVAL,
+ help=_("Task status check interval"))
+]
+
# Register the configuration options
cfg.CONF.register_opts(connection_opts)
cfg.CONF.register_opts(cluster_opts)
cfg.CONF.register_opts(nvp_opts, "NVP")
cfg.CONF.register_opts(sync_opts, "NVP_SYNC")
-
+cfg.CONF.register_opts(vcns_opts, group="vcns")
# NOTE(armando-migliaccio): keep the following code until we support
# NVP configuration files in older format (Grizzly or older).
# ### BEGIN
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: linb, VMware
+
+import base64
+
+import eventlet
+
+from neutron.openstack.common import jsonutils
+from neutron.plugins.nicira.vshield.common import exceptions
+
+httplib2 = eventlet.import_patched('httplib2')
+
+
+def xmldumps(obj):
+ config = ""
+ if isinstance(obj, dict):
+ for key, value in obj.iteritems():
+ cfg = "<%s>%s</%s>" % (key, xmldumps(value), key)
+ config += cfg
+ elif isinstance(obj, list):
+ for value in obj:
+ config += xmldumps(value)
+ else:
+ config = obj
+
+ return config
+
+
+class VcnsApiHelper(object):
+ errors = {
+ 303: exceptions.ResourceRedirect,
+ 400: exceptions.RequestBad,
+ 403: exceptions.Forbidden,
+ 404: exceptions.ResourceNotFound,
+ 415: exceptions.MediaTypeUnsupport,
+ 503: exceptions.ServiceUnavailable
+ }
+
+ def __init__(self, address, user, password, format='json'):
+ self.authToken = base64.encodestring("%s:%s" % (user, password))
+ self.user = user
+ self.passwd = password
+ self.address = address
+ self.format = format
+ if format == 'json':
+ self.encode = jsonutils.dumps
+ else:
+ self.encode = xmldumps
+
+ def request(self, method, uri, params=None):
+ uri = self.address + uri
+ http = httplib2.Http()
+ http.disable_ssl_certificate_validation = True
+ headers = {
+ 'Content-Type': 'application/' + self.format,
+ 'Accept': 'application/' + 'json',
+ 'Authorization': 'Basic ' + self.authToken
+ }
+ body = self.encode(params) if params else None
+ header, response = http.request(uri, method,
+ body=body, headers=headers)
+ status = int(header['status'])
+ if 200 <= status < 300:
+ return header, response
+ if status in self.errors:
+ cls = self.errors[status]
+ else:
+ cls = exceptions.VcnsApiException
+ raise cls(uri=uri, status=status, header=header, response=response)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+EDGE_ID = 'edge_id'
+ROUTER_ID = 'router_id'
+
+# Interface
+EXTERNAL_VNIC_INDEX = 0
+INTERNAL_VNIC_INDEX = 1
+EXTERNAL_VNIC_NAME = "external"
+INTERNAL_VNIC_NAME = "internal"
+
+INTEGRATION_LR_IPADDRESS = "169.254.2.1/28"
+INTEGRATION_EDGE_IPADDRESS = "169.254.2.3"
+INTEGRATION_SUBNET_NETMASK = "255.255.255.240"
+
+# SNAT rule location
+PREPEND = 0
+APPEND = -1
+
+# error code
+VCNS_ERROR_CODE_EDGE_NOT_RUNNING = 10013
+
+
+# router status by number
+class RouterStatus(object):
+ ROUTER_STATUS_ACTIVE = 0
+ ROUTER_STATUS_DOWN = 1
+ ROUTER_STATUS_PENDING_CREATE = 2
+ ROUTER_STATUS_PENDING_DELETE = 3
+ ROUTER_STATUS_ERROR = 4
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: linb, VMware
+
+from neutron.common import exceptions
+
+
+class VcnsException(exceptions.NeutronException):
+ pass
+
+
+class VcnsGeneralException(VcnsException):
+ def __init__(self, message):
+ self.message = message
+ super(VcnsGeneralException, self).__init__()
+
+
+class VcnsApiException(VcnsException):
+ message = _("An unknown exception %(status)s occurred: %(response)s.")
+
+ def __init__(self, **kwargs):
+ super(VcnsApiException, self).__init__(**kwargs)
+
+ self.status = kwargs.get('status')
+ self.header = kwargs.get('header')
+ self.response = kwargs.get('response')
+
+
+class ResourceRedirect(VcnsApiException):
+ message = _("Resource %(uri)s has been redirected")
+
+
+class RequestBad(VcnsApiException):
+ message = _("Request %(uri)s is Bad, response %(response)s")
+
+
+class Forbidden(VcnsApiException):
+ message = _("Forbidden: %(uri)s")
+
+
+class ResourceNotFound(VcnsApiException):
+ message = _("Resource %(uri)s not found")
+
+
+class MediaTypeUnsupport(VcnsApiException):
+ message = _("Media Type %(uri)s is not supported")
+
+
+class ServiceUnavailable(VcnsApiException):
+ message = _("Service on available: %(uri)s")
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Kaiwei Fan, VMware, Inc.
+# @author: Bo Link, VMware, Inc.
+
+from neutron.openstack.common import jsonutils
+from neutron.openstack.common import log as logging
+from neutron.plugins.nicira.vshield.common import (
+ constants as vcns_const)
+from neutron.plugins.nicira.vshield.common.constants import RouterStatus
+from neutron.plugins.nicira.vshield.common import exceptions
+from neutron.plugins.nicira.vshield.tasks.constants import TaskState
+from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
+from neutron.plugins.nicira.vshield.tasks import tasks
+
+LOG = logging.getLogger(__name__)
+
+
+class EdgeApplianceDriver(object):
+ def __init__(self):
+ # store the last task per edge that has the latest config
+ self.updated_task = {
+ 'nat': {},
+ 'route': {},
+ }
+
+ def _assemble_edge(self, name, appliance_size="compact",
+ deployment_container_id=None, datacenter_moid=None,
+ enable_aesni=True, hypervisor_assist=False,
+ enable_fips=False, remote_access=False):
+ edge = {
+ 'name': name,
+ 'fqdn': name,
+ 'hypervisorAssist': hypervisor_assist,
+ 'type': 'gatewayServices',
+ 'enableAesni': enable_aesni,
+ 'enableFips': enable_fips,
+ 'cliSettings': {
+ 'remoteAccess': remote_access
+ },
+ 'appliances': {
+ 'applianceSize': appliance_size
+ },
+ 'vnics': {
+ 'vnics': []
+ }
+ }
+ if deployment_container_id:
+ edge['appliances']['deploymentContainerId'] = (
+ deployment_container_id)
+ if datacenter_moid:
+ edge['datacenterMoid'] = datacenter_moid,
+
+ return edge
+
+ def _assemble_edge_appliance(self, resource_pool_id, datastore_id):
+ appliance = {}
+ if resource_pool_id:
+ appliance['resourcePoolId'] = resource_pool_id
+ if datastore_id:
+ appliance['datastoreId'] = datastore_id
+ return appliance
+
+ def _assemble_edge_vnic(self, name, index, portgroup_id,
+ primary_address=None, subnet_mask=None,
+ secondary=None,
+ type="internal",
+ enable_proxy_arp=False,
+ enable_send_redirects=True,
+ is_connected=True,
+ mtu=1500):
+ vnic = {
+ 'index': index,
+ 'name': name,
+ 'type': type,
+ 'portgroupId': portgroup_id,
+ 'mtu': mtu,
+ 'enableProxyArp': enable_proxy_arp,
+ 'enableSendRedirects': enable_send_redirects,
+ 'isConnected': is_connected
+ }
+ if primary_address and subnet_mask:
+ address_group = {
+ 'primaryAddress': primary_address,
+ 'subnetMask': subnet_mask
+ }
+ if secondary:
+ address_group['secondaryAddresses'] = {
+ 'ipAddress': secondary
+ }
+
+ vnic['addressGroups'] = {
+ 'addressGroups': [address_group]
+ }
+
+ return vnic
+
+ def _edge_status_to_level(self, status):
+ if status == 'GREEN':
+ status_level = RouterStatus.ROUTER_STATUS_ACTIVE
+ elif status in ('GREY', 'YELLOW'):
+ status_level = RouterStatus.ROUTER_STATUS_DOWN
+ else:
+ status_level = RouterStatus.ROUTER_STATUS_ERROR
+ return status_level
+
+ def get_edge_status(self, edge_id):
+ try:
+ response = self.vcns.get_edge_status(edge_id)[1]
+ status_level = self._edge_status_to_level(
+ response['edgeStatus'])
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to get edge status:\n%s"),
+ e.response)
+ status_level = RouterStatus.ROUTER_STATUS_ERROR
+ try:
+ desc = jsonutils.loads(e.response)
+ if desc.get('errorCode') == (
+ vcns_const.VCNS_ERROR_CODE_EDGE_NOT_RUNNING):
+ status_level = RouterStatus.ROUTER_STATUS_DOWN
+ except ValueError:
+ LOG.exception(e.response)
+
+ return status_level
+
+ def get_edges_statuses(self):
+ edges_status_level = {}
+ edges = self._get_edges()
+ for edge in edges['edgePage'].get('data', []):
+ edge_id = edge['id']
+ status = edge['edgeStatus']
+ edges_status_level[edge_id] = self._edge_status_to_level(status)
+
+ return edges_status_level
+
+ def _update_interface(self, task):
+ edge_id = task.userdata['edge_id']
+ config = task.userdata['config']
+ LOG.debug(_("VCNS: start updating vnic %s"), config)
+ try:
+ self.vcns.update_interface(edge_id, config)
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to update vnic %(config)s:\n"
+ "%(response)s"), {
+ 'config': config,
+ 'response': e.response})
+ raise e
+ except Exception as e:
+ LOG.exception(_("VCNS: Failed to update vnic %d"),
+ config['index'])
+ raise e
+
+ return TaskStatus.COMPLETED
+
+ def update_interface(self, router_id, edge_id, index, network,
+ address=None, netmask=None, secondary=None,
+ jobdata=None):
+ LOG.debug(_("VCNS: update vnic %(index)d: %(addr)s %(netmask)s"), {
+ 'index': index, 'addr': address, 'netmask': netmask})
+ if index == vcns_const.EXTERNAL_VNIC_INDEX:
+ name = vcns_const.EXTERNAL_VNIC_NAME
+ intf_type = 'uplink'
+ elif index == vcns_const.INTERNAL_VNIC_INDEX:
+ name = vcns_const.INTERNAL_VNIC_NAME
+ intf_type = 'internal'
+ else:
+ msg = _("Vnic %d currently not supported") % index
+ raise exceptions.VcnsGeneralException(msg)
+
+ config = self._assemble_edge_vnic(
+ name, index, network, address, netmask, secondary, type=intf_type)
+
+ userdata = {
+ 'edge_id': edge_id,
+ 'config': config,
+ 'jobdata': jobdata
+ }
+ task_name = "update-interface-%s-%d" % (edge_id, index)
+ task = tasks.Task(task_name, router_id,
+ self._update_interface, userdata=userdata)
+ task.add_result_monitor(self.callbacks.interface_update_result)
+ self.task_manager.add(task)
+ return task
+
+ def _deploy_edge(self, task):
+ userdata = task.userdata
+ name = userdata['router_name']
+ LOG.debug(_("VCNS: start deploying edge %s"), name)
+ request = userdata['request']
+ try:
+ header = self.vcns.deploy_edge(request)[0]
+ objuri = header['location']
+ job_id = objuri[objuri.rfind("/") + 1:]
+ response = self.vcns.get_edge_id(job_id)[1]
+ edge_id = response['edgeId']
+ LOG.debug(_("VCNS: deploying edge %s"), edge_id)
+ userdata['edge_id'] = edge_id
+ status = TaskStatus.PENDING
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: deploy edge failed for router %s."),
+ name)
+ raise e
+
+ return status
+
+ def _status_edge(self, task):
+ edge_id = task.userdata['edge_id']
+ try:
+ response = self.vcns.get_edge_deploy_status(edge_id)[1]
+ task.userdata['retries'] = 0
+ system_status = response.get('systemStatus', None)
+ if system_status is None:
+ status = TaskStatus.PENDING
+ elif system_status == 'good':
+ status = TaskStatus.COMPLETED
+ else:
+ status = TaskStatus.ERROR
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Edge %s status query failed."), edge_id)
+ raise e
+ except Exception as e:
+ retries = task.userdata.get('retries', 0) + 1
+ if retries < 3:
+ task.userdata['retries'] = retries
+ msg = _("VCNS: Unable to retrieve edge %(edge_id)s status. "
+ "Retry %(retries)d.") % {
+ 'edge_id': edge_id,
+ 'retries': retries}
+ LOG.exception(msg)
+ status = TaskStatus.PENDING
+ else:
+ msg = _("VCNS: Unable to retrieve edge %s status. "
+ "Abort.") % edge_id
+ LOG.exception(msg)
+ status = TaskStatus.ERROR
+ LOG.debug(_("VCNS: Edge %s status"), edge_id)
+ return status
+
+ def _result_edge(self, task):
+ router_name = task.userdata['router_name']
+ edge_id = task.userdata.get('edge_id')
+ if task.status != TaskStatus.COMPLETED:
+ LOG.error(_("VCNS: Failed to deploy edge %(edge_id)s "
+ "for %(name)s, status %(status)d"), {
+ 'edge_id': edge_id,
+ 'name': router_name,
+ 'status': task.status
+ })
+ else:
+ LOG.debug(_("VCNS: Edge %(edge_id)s deployed for "
+ "router %(name)s"), {
+ 'edge_id': edge_id, 'name': router_name
+ })
+
+ def _delete_edge(self, task):
+ edge_id = task.userdata['edge_id']
+ LOG.debug(_("VCNS: start destroying edge %s"), edge_id)
+ status = TaskStatus.COMPLETED
+ if edge_id:
+ try:
+ self.vcns.delete_edge(edge_id)
+ except exceptions.ResourceNotFound:
+ pass
+ except exceptions.VcnsApiException as e:
+ msg = _("VCNS: Failed to delete %{edge_id)s:\n"
+ "%(response)s") % {
+ 'edge_id': edge_id, 'response': e.response}
+ LOG.exception(msg)
+ status = TaskStatus.ERROR
+ except Exception:
+ LOG.exception(_("VCNS: Failed to delete %s"), edge_id)
+ status = TaskStatus.ERROR
+
+ return status
+
+ def _get_edges(self):
+ try:
+ return self.vcns.get_edges()[1]
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to get edges:\n%s"), e.response)
+ raise e
+
+ def deploy_edge(self, router_id, name, internal_network, jobdata=None,
+ wait_for_exec=False):
+ task_name = 'deploying-%s' % name
+ edge_name = name
+ edge = self._assemble_edge(
+ edge_name, datacenter_moid=self.datacenter_moid,
+ deployment_container_id=self.deployment_container_id,
+ appliance_size='large', remote_access=True)
+ appliance = self._assemble_edge_appliance(self.resource_pool_id,
+ self.datastore_id)
+ if appliance:
+ edge['appliances']['appliances'] = [appliance]
+
+ vnic_external = self._assemble_edge_vnic(
+ vcns_const.EXTERNAL_VNIC_NAME, vcns_const.EXTERNAL_VNIC_INDEX,
+ self.external_network, type="uplink")
+ edge['vnics']['vnics'].append(vnic_external)
+ vnic_inside = self._assemble_edge_vnic(
+ vcns_const.INTERNAL_VNIC_NAME, vcns_const.INTERNAL_VNIC_INDEX,
+ internal_network,
+ vcns_const.INTEGRATION_EDGE_IPADDRESS,
+ vcns_const.INTEGRATION_SUBNET_NETMASK,
+ type="internal")
+ edge['vnics']['vnics'].append(vnic_inside)
+ userdata = {
+ 'request': edge,
+ 'router_name': name,
+ 'jobdata': jobdata
+ }
+ task = tasks.Task(task_name, router_id,
+ self._deploy_edge,
+ status_callback=self._status_edge,
+ result_callback=self._result_edge,
+ userdata=userdata)
+ task.add_executed_monitor(self.callbacks.edge_deploy_started)
+ task.add_result_monitor(self.callbacks.edge_deploy_result)
+ self.task_manager.add(task)
+
+ if wait_for_exec:
+ # waitl until the deploy task is executed so edge_id is available
+ task.wait(TaskState.EXECUTED)
+
+ return task
+
+ def delete_edge(self, router_id, edge_id, jobdata=None):
+ task_name = 'delete-%s' % edge_id
+ userdata = {
+ 'router_id': router_id,
+ 'edge_id': edge_id,
+ 'jobdata': jobdata
+ }
+ task = tasks.Task(task_name, router_id, self._delete_edge,
+ userdata=userdata)
+ task.add_result_monitor(self.callbacks.edge_delete_result)
+ self.task_manager.add(task)
+ return task
+
+ def _assemble_nat_rule(self, action, original_address,
+ translated_address,
+ vnic_index=vcns_const.EXTERNAL_VNIC_INDEX,
+ enabled=True):
+ nat_rule = {}
+ nat_rule['action'] = action
+ nat_rule['vnic'] = vnic_index
+ nat_rule['originalAddress'] = original_address
+ nat_rule['translatedAddress'] = translated_address
+ nat_rule['enabled'] = enabled
+ return nat_rule
+
+ def get_nat_config(self, edge_id):
+ try:
+ return self.vcns.get_nat_config(edge_id)[1]
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to get nat config:\n%s"),
+ e.response)
+ raise e
+
+ def _create_nat_rule(self, task):
+ # TODO(fank): use POST for optimization
+ # return rule_id for future reference
+ rule = task.userdata['rule']
+ LOG.debug(_("VCNS: start creating nat rules: %s"), rule)
+ edge_id = task.userdata['edge_id']
+ nat = self.get_nat_config(edge_id)
+ location = task.userdata['location']
+
+ del nat['version']
+
+ if location is None or location == vcns_const.APPEND:
+ nat['rules']['natRulesDtos'].append(rule)
+ else:
+ nat['rules']['natRulesDtos'].insert(location, rule)
+
+ try:
+ self.vcns.update_nat_config(edge_id, nat)
+ status = TaskStatus.COMPLETED
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
+ e.response)
+ status = TaskStatus.ERROR
+
+ return status
+
+ def create_snat_rule(self, router_id, edge_id, src, translated,
+ jobdata=None, location=None):
+ LOG.debug(_("VCNS: create snat rule %(src)s/%(translated)s"), {
+ 'src': src, 'translated': translated})
+ snat_rule = self._assemble_nat_rule("snat", src, translated)
+ userdata = {
+ 'router_id': router_id,
+ 'edge_id': edge_id,
+ 'rule': snat_rule,
+ 'location': location,
+ 'jobdata': jobdata
+ }
+ task_name = "create-snat-%s-%s-%s" % (edge_id, src, translated)
+ task = tasks.Task(task_name, router_id, self._create_nat_rule,
+ userdata=userdata)
+ task.add_result_monitor(self.callbacks.snat_create_result)
+ self.task_manager.add(task)
+ return task
+
+ def _delete_nat_rule(self, task):
+ # TODO(fank): pass in rule_id for optimization
+ # handle routes update for optimization
+ edge_id = task.userdata['edge_id']
+ address = task.userdata['address']
+ addrtype = task.userdata['addrtype']
+ LOG.debug(_("VCNS: start deleting %(type)s rules: %(addr)s"), {
+ 'type': addrtype, 'addr': address})
+ nat = self.get_nat_config(edge_id)
+ del nat['version']
+ status = TaskStatus.COMPLETED
+ for nat_rule in nat['rules']['natRulesDtos']:
+ if nat_rule[addrtype] == address:
+ rule_id = nat_rule['ruleId']
+ try:
+ self.vcns.delete_nat_rule(edge_id, rule_id)
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to delete snat rule:\n"
+ "%s"), e.response)
+ status = TaskStatus.ERROR
+
+ return status
+
+ def delete_snat_rule(self, router_id, edge_id, src, jobdata=None):
+ LOG.debug(_("VCNS: delete snat rule %s"), src)
+ userdata = {
+ 'edge_id': edge_id,
+ 'address': src,
+ 'addrtype': 'originalAddress',
+ 'jobdata': jobdata
+ }
+ task_name = "delete-snat-%s-%s" % (edge_id, src)
+ task = tasks.Task(task_name, router_id, self._delete_nat_rule,
+ userdata=userdata)
+ task.add_result_monitor(self.callbacks.snat_delete_result)
+ self.task_manager.add(task)
+ return task
+
+ def create_dnat_rule(self, router_id, edge_id, dst, translated,
+ jobdata=None, location=None):
+ # TODO(fank): use POST for optimization
+ # return rule_id for future reference
+ LOG.debug(_("VCNS: create dnat rule %(dst)s/%(translated)s"), {
+ 'dst': dst, 'translated': translated})
+ dnat_rule = self._assemble_nat_rule(
+ "dnat", dst, translated)
+ userdata = {
+ 'router_id': router_id,
+ 'edge_id': edge_id,
+ 'rule': dnat_rule,
+ 'location': location,
+ 'jobdata': jobdata
+ }
+ task_name = "create-dnat-%s-%s-%s" % (edge_id, dst, translated)
+ task = tasks.Task(task_name, router_id, self._create_nat_rule,
+ userdata=userdata)
+ task.add_result_monitor(self.callbacks.dnat_create_result)
+ self.task_manager.add(task)
+ return task
+
+ def delete_dnat_rule(self, router_id, edge_id, translated,
+ jobdata=None):
+ # TODO(fank): pass in rule_id for optimization
+ LOG.debug(_("VCNS: delete dnat rule %s"), translated)
+ userdata = {
+ 'edge_id': edge_id,
+ 'address': translated,
+ 'addrtype': 'translatedAddress',
+ 'jobdata': jobdata
+ }
+ task_name = "delete-dnat-%s-%s" % (edge_id, translated)
+ task = tasks.Task(task_name, router_id, self._delete_nat_rule,
+ userdata=userdata)
+ task.add_result_monitor(self.callbacks.dnat_delete_result)
+ self.task_manager.add(task)
+ return task
+
+ def _update_nat_rule(self, task):
+ # TODO(fank): use POST for optimization
+ # return rule_id for future reference
+ edge_id = task.userdata['edge_id']
+ if task != self.updated_task['nat'][edge_id]:
+ # this task does not have the latest config, abort now
+ # for speedup
+ return TaskStatus.ABORT
+
+ rules = task.userdata['rules']
+ LOG.debug(_("VCNS: start updating nat rules: %s"), rules)
+
+ nat = {
+ 'rules': {
+ 'natRulesDtos': rules
+ }
+ }
+
+ try:
+ self.vcns.update_nat_config(edge_id, nat)
+ status = TaskStatus.COMPLETED
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
+ e.response)
+ status = TaskStatus.ERROR
+
+ return status
+
+ def update_nat_rules(self, router_id, edge_id, snats, dnats,
+ jobdata=None):
+ LOG.debug(_("VCNS: update nat rule\n"
+ "SNAT:%(snat)s\n"
+ "DNAT:%(dnat)s\n"), {
+ 'snat': snats, 'dnat': dnats})
+ nat_rules = []
+
+ for dnat in dnats:
+ nat_rules.append(self._assemble_nat_rule(
+ 'dnat', dnat['dst'], dnat['translated']))
+ nat_rules.append(self._assemble_nat_rule(
+ 'snat', dnat['translated'], dnat['dst']))
+
+ for snat in snats:
+ nat_rules.append(self._assemble_nat_rule(
+ 'snat', snat['src'], snat['translated']))
+
+ userdata = {
+ 'edge_id': edge_id,
+ 'rules': nat_rules,
+ 'jobdata': jobdata,
+ }
+ task_name = "update-nat-%s" % edge_id
+ task = tasks.Task(task_name, router_id, self._update_nat_rule,
+ userdata=userdata)
+ task.add_result_monitor(self.callbacks.nat_update_result)
+ self.updated_task['nat'][edge_id] = task
+ self.task_manager.add(task)
+ return task
+
+ def _update_routes(self, task):
+ edge_id = task.userdata['edge_id']
+ if (task != self.updated_task['route'][edge_id] and
+ task.userdata.get('skippable', True)):
+ # this task does not have the latest config, abort now
+ # for speedup
+ return TaskStatus.ABORT
+ gateway = task.userdata['gateway']
+ routes = task.userdata['routes']
+ LOG.debug(_("VCNS: start updating routes for %s"), edge_id)
+ static_routes = []
+ for route in routes:
+ static_routes.append({
+ "route": {
+ "description": "",
+ "vnic": vcns_const.INTERNAL_VNIC_INDEX,
+ "network": route['cidr'],
+ "nextHop": route['nexthop']
+ }
+ })
+ request = {
+ "staticRouting": {
+ "staticRoutes": static_routes,
+ }
+ }
+ if gateway:
+ request["staticRouting"]["defaultRoute"] = {
+ "description": "default-gateway",
+ "gatewayAddress": gateway,
+ "vnic": vcns_const.EXTERNAL_VNIC_INDEX
+ }
+ try:
+ self.vcns.update_routes(edge_id, request)
+ status = TaskStatus.COMPLETED
+ except exceptions.VcnsApiException as e:
+ LOG.exception(_("VCNS: Failed to update routes:\n%s"),
+ e.response)
+ status = TaskStatus.ERROR
+
+ return status
+
+ def update_routes(self, router_id, edge_id, gateway, routes,
+ skippable=True, jobdata=None):
+ if gateway:
+ gateway = gateway.split('/')[0]
+
+ userdata = {
+ 'edge_id': edge_id,
+ 'gateway': gateway,
+ 'routes': routes,
+ 'skippable': skippable,
+ 'jobdata': jobdata
+ }
+ task_name = "update-routes-%s" % (edge_id)
+ task = tasks.Task(task_name, router_id, self._update_routes,
+ userdata=userdata)
+ task.add_result_monitor(self.callbacks.routes_update_result)
+ self.updated_task['route'][edge_id] = task
+ self.task_manager.add(task)
+ return task
+
+ def create_lswitch(self, name, tz_config):
+ lsconfig = {
+ 'display_name': name,
+ "tags": [],
+ "type": "LogicalSwitchConfig",
+ "_schema": "/ws.v1/schema/LogicalSwitchConfig",
+ "port_isolation_enabled": False,
+ "replication_mode": "service",
+ "transport_zones": tz_config
+ }
+
+ response = self.vcns.create_lswitch(lsconfig)[1]
+ return response
+
+ def delete_lswitch(self, lswitch_id):
+ self.vcns.delete_lswitch(lswitch_id)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class TaskStatus(object):
+ """Task running status.
+
+ This is used by execution/status callback function to notify the
+ task manager what's the status of current task, and also used for
+ indication the final task execution result.
+ """
+ PENDING = 1
+ COMPLETED = 2
+ ERROR = 3
+ ABORT = 4
+
+
+class TaskState(object):
+ """Current state of a task.
+
+ This is to keep track of the current state of a task.
+ NONE: the task is still in the queue
+ START: the task is pull out from the queue and is about to be executed
+ EXECUTED: the task has been executed
+ STATUS: we're running periodic status check for this task
+ RESULT: the task has finished and result is ready
+ """
+ NONE = -1
+ START = 0
+ EXECUTED = 1
+ STATUS = 2
+ RESULT = 3
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+import uuid
+
+from eventlet import event
+from eventlet import greenthread
+from eventlet.support import greenlets as greenlet
+
+from neutron.common import exceptions
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
+from neutron.plugins.nicira.vshield.tasks.constants import TaskState
+from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
+
+DEFAULT_INTERVAL = 1000
+
+LOG = logging.getLogger(__name__)
+
+
+def nop(task):
+ return TaskStatus.COMPLETED
+
+
+class TaskException(exceptions.NeutronException):
+
+ def __init__(self, message=None, **kwargs):
+ if message is not None:
+ self.message = message
+
+ super(TaskException, self).__init__(**kwargs)
+
+
+class InvalidState(TaskException):
+ message = _("Invalid state %(state)d")
+
+
+class TaskStateSkipped(TaskException):
+ message = _("State %(state)d skipped. Current state %(current)d")
+
+
+class Task():
+ def __init__(self, name, resource_id, execute_callback,
+ status_callback=nop, result_callback=nop, userdata=None):
+ self.name = name
+ self.resource_id = resource_id
+ self._execute_callback = execute_callback
+ self._status_callback = status_callback
+ self._result_callback = result_callback
+ self.userdata = userdata
+ self.id = None
+ self.status = None
+
+ self._monitors = {
+ TaskState.START: [],
+ TaskState.EXECUTED: [],
+ TaskState.RESULT: []
+ }
+ self._states = [None, None, None, None]
+ self._state = TaskState.NONE
+
+ def _add_monitor(self, action, func):
+ self._monitors[action].append(func)
+ return self
+
+ def _move_state(self, state):
+ self._state = state
+ if self._states[state] is not None:
+ e = self._states[state]
+ self._states[state] = None
+ e.send()
+
+ for s in range(state):
+ if self._states[s] is not None:
+ e = self._states[s]
+ self._states[s] = None
+ e.send_exception(
+ TaskStateSkipped(state=s, current=self._state))
+
+ def _invoke_monitor(self, state):
+ for func in self._monitors[state]:
+ try:
+ func(self)
+ except Exception:
+ msg = _("Task %(task)s encountered exception in %(func)s "
+ "at state %(state)s") % {
+ 'task': str(self),
+ 'func': str(func),
+ 'state': state}
+ LOG.exception(msg)
+
+ self._move_state(state)
+
+ return self
+
+ def _start(self):
+ return self._invoke_monitor(TaskState.START)
+
+ def _executed(self):
+ return self._invoke_monitor(TaskState.EXECUTED)
+
+ def _update_status(self, status):
+ if self.status == status:
+ return self
+
+ self.status = status
+
+ def _finished(self):
+ return self._invoke_monitor(TaskState.RESULT)
+
+ def add_start_monitor(self, func):
+ return self._add_monitor(TaskState.START, func)
+
+ def add_executed_monitor(self, func):
+ return self._add_monitor(TaskState.EXECUTED, func)
+
+ def add_result_monitor(self, func):
+ return self._add_monitor(TaskState.RESULT, func)
+
+ def wait(self, state):
+ if (state < TaskState.START or
+ state > TaskState.RESULT or
+ state == TaskState.STATUS):
+ raise InvalidState(state=state)
+
+ if state <= self._state:
+ # we already passed this current state, so no wait
+ return
+
+ e = event.Event()
+ self._states[state] = e
+ e.wait()
+
+ def __repr__(self):
+ return "Task-%s-%s-%s" % (
+ self.name, self.resource_id, self.id)
+
+
+class TaskManager():
+
+ _instance = None
+ _default_interval = DEFAULT_INTERVAL
+
+ def __init__(self, interval=None):
+ self._interval = interval or TaskManager._default_interval
+
+ # A queue to pass tasks from other threads
+ self._tasks_queue = collections.deque()
+
+ # A dict to store resource -> resource's tasks
+ self._tasks = {}
+
+ # New request event
+ self._req = event.Event()
+
+ # TaskHandler stopped event
+ self._stopped = event.Event()
+
+ # Periodic function trigger
+ self._monitor = None
+ self._monitor_busy = False
+ self._monitor_stop = None
+
+ # Thread handling the task request
+ self._thread = None
+
+ def _execute(self, task):
+ """Execute task."""
+ msg = _("Start task %s") % str(task)
+ LOG.debug(msg)
+ task._start()
+ try:
+ status = task._execute_callback(task)
+ except Exception:
+ msg = _("Task %(task)s encountered exception in %(cb)s") % {
+ 'task': str(task),
+ 'cb': str(task._execute_callback)}
+ LOG.exception(msg)
+ status = TaskStatus.ERROR
+
+ LOG.debug(_("Task %(task)s return %(status)s"), {
+ 'task': str(task),
+ 'status': status})
+
+ task._update_status(status)
+ task._executed()
+
+ return status
+
+ def _result(self, task):
+ """Notify task execution result."""
+ try:
+ task._result_callback(task)
+ except Exception:
+ msg = _("Task %(task)s encountered exception in %(cb)s") % {
+ 'task': str(task),
+ 'cb': str(task._result_callback)}
+ LOG.exception(msg)
+
+ LOG.debug(_("Task %(task)s return %(status)s") % {
+ 'task': str(task),
+ 'status': task.status})
+
+ task._finished()
+
+ def _check_pending_tasks(self):
+ """Check all pending tasks status."""
+ for resource_id in self._tasks.keys():
+ if self._monitor_stop:
+ # looping call is asked to stop, return now
+ return
+
+ tasks = self._tasks[resource_id]
+ # only the first task is executed and pending
+ task = tasks[0]
+ try:
+ status = task._status_callback(task)
+ except Exception:
+ msg = _("Task %(task)s encountered exception in %(cb)s") % {
+ 'task': str(task),
+ 'cb': str(task._status_callback)}
+ LOG.exception(msg)
+ status = TaskStatus.ERROR
+ task._update_status(status)
+ if status != TaskStatus.PENDING:
+ self._dequeue(task, True)
+
+ def _enqueue(self, task):
+ if task.resource_id in self._tasks:
+ # append to existing resource queue for ordered processing
+ self._tasks[task.resource_id].append(task)
+ else:
+ # put the task to a new resource queue
+ tasks = collections.deque()
+ tasks.append(task)
+ self._tasks[task.resource_id] = tasks
+
+ def _dequeue(self, task, run_next):
+ self._result(task)
+ tasks = self._tasks[task.resource_id]
+ tasks.remove(task)
+ if not tasks:
+ # no more tasks for this resource
+ del self._tasks[task.resource_id]
+ return
+
+ if run_next:
+ # process next task for this resource
+ while tasks:
+ task = tasks[0]
+ status = self._execute(task)
+ if status == TaskStatus.PENDING:
+ break
+ self._dequeue(task, False)
+
+ def _abort(self):
+ """Abort all tasks."""
+ for resource_id in self._tasks.keys():
+ tasks = list(self._tasks[resource_id])
+ for task in tasks:
+ task._update_status(TaskStatus.ABORT)
+ self._dequeue(task, False)
+
+ def _get_task(self):
+ """Get task request."""
+ while True:
+ for t in self._tasks_queue:
+ return self._tasks_queue.popleft()
+ self._req.wait()
+ self._req.reset()
+
+ def run(self):
+ while True:
+ try:
+ # get a task from queue, or timeout for periodic status check
+ task = self._get_task()
+ if task.resource_id in self._tasks:
+ # this resource already has some tasks under processing,
+ # append the task to same queue for ordered processing
+ self._enqueue(task)
+ continue
+
+ status = self._execute(task)
+
+ if status != TaskStatus.PENDING:
+ self._result(task)
+ continue
+
+ self._enqueue(task)
+ except greenlet.GreenletExit:
+ break
+ except Exception:
+ LOG.exception(_("TaskManager terminated"))
+ break
+
+ self._monitor.stop()
+ if self._monitor_busy:
+ self._monitor_stop = event.Event()
+ self._monitor_stop.wait()
+ self._monitor_stop = None
+ self._abort()
+ self._stopped.send()
+
+ def add(self, task):
+ task.id = uuid.uuid1()
+ self._tasks_queue.append(task)
+ if not self._req.ready():
+ self._req.send()
+ return task.id
+
+ def stop(self):
+ if not self._thread:
+ return
+ self._thread.kill()
+ self._stopped.wait()
+ self._thread = None
+
+ def has_pending_task(self):
+ if self._tasks_queue:
+ return True
+
+ if self._tasks:
+ return True
+
+ return False
+
+ def show_pending_tasks(self):
+ for task in self._tasks_queue:
+ print str(task)
+ for resource, tasks in self._tasks.iteritems():
+ for task in tasks:
+ print str(task)
+
+ def count(self):
+ count = 0
+ for resource_id, tasks in self._tasks.iteritems():
+ count += len(tasks)
+ return count
+
+ def start(self, interval=None):
+ def _inner():
+ self.run()
+
+ def _loopingcall_callback():
+ try:
+ self._monitor_busy = True
+ self._check_pending_tasks()
+ self._monitor_busy = False
+ if self._monitor_stop:
+ self._monitor_stop.send()
+ except Exception:
+ LOG.exception(_("Exception in _check_pending_tasks"))
+
+ if self._thread:
+ return self
+
+ if interval is None or interval == 0:
+ interval = self._interval
+
+ self._thread = greenthread.spawn(_inner)
+ self._monitor = loopingcall.FixedIntervalLoopingCall(
+ _loopingcall_callback)
+ self._monitor.start(interval / 1000.0,
+ interval / 1000.0)
+
+ return self
+
+ @classmethod
+ def set_default_interval(cls, interval):
+ cls._default_interval = interval
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: linb, VMware
+
+from neutron.openstack.common import jsonutils
+from neutron.openstack.common import log as logging
+from neutron.plugins.nicira.vshield.common import VcnsApiClient
+
+LOG = logging.getLogger(__name__)
+
+HTTP_GET = "GET"
+HTTP_POST = "POST"
+HTTP_DELETE = "DELETE"
+HTTP_PUT = "PUT"
+URI_PREFIX = "/api/4.0/edges"
+
+
+class Vcns(object):
+
+ def __init__(self, address, user, password):
+ self.address = address
+ self.user = user
+ self.password = password
+ self.jsonapi_client = VcnsApiClient.VcnsApiHelper(address, user,
+ password, 'json')
+ # TODO(fank): remove this after json syntax is fixed on VSM
+ self.xmlapi_client = VcnsApiClient.VcnsApiHelper(address, user,
+ password, 'xml')
+
+ def do_request(self, method, uri, params=None, format='json', **kwargs):
+ LOG.debug(_("VcnsApiHelper('%(method)s', '%(uri)s', '%(body)s')"), {
+ 'method': method,
+ 'uri': uri,
+ 'body': jsonutils.dumps(params)})
+ if format == 'json':
+ header, content = self.jsonapi_client.request(method, uri, params)
+ else:
+ header, content = self.xmlapi_client.request(method, uri, params)
+ LOG.debug(_("Header: '%s'"), header)
+ LOG.debug(_("Content: '%s'"), content)
+ if content == '':
+ return header, {}
+ if kwargs.get('decode', True):
+ content = jsonutils.loads(content)
+ return header, content
+
+ def deploy_edge(self, request):
+ uri = URI_PREFIX + "?async=true"
+ return self.do_request(HTTP_POST, uri, request, decode=False)
+
+ def get_edge_id(self, job_id):
+ uri = URI_PREFIX + "/jobs/%s" % job_id
+ return self.do_request(HTTP_GET, uri, decode=True)
+
+ def get_edge_deploy_status(self, edge_id):
+ uri = URI_PREFIX + "/%s/status?getlatest=false" % edge_id
+ return self.do_request(HTTP_GET, uri, decode="True")
+
+ def delete_edge(self, edge_id):
+ uri = "%s/%s" % (URI_PREFIX, edge_id)
+ return self.do_request(HTTP_DELETE, uri)
+
+ def update_interface(self, edge_id, vnic):
+ uri = "%s/%s/vnics/%d" % (URI_PREFIX, edge_id, vnic['index'])
+ return self.do_request(HTTP_PUT, uri, vnic, decode=True)
+
+ def get_nat_config(self, edge_id):
+ uri = "%s/%s/nat/config" % (URI_PREFIX, edge_id)
+ return self.do_request(HTTP_GET, uri, decode=True)
+
+ def update_nat_config(self, edge_id, nat):
+ uri = "%s/%s/nat/config" % (URI_PREFIX, edge_id)
+ return self.do_request(HTTP_PUT, uri, nat, decode=True)
+
+ def delete_nat_rule(self, edge_id, rule_id):
+ uri = "%s/%s/nat/config/rules/%s" % (URI_PREFIX, edge_id, rule_id)
+ return self.do_request(HTTP_DELETE, uri, decode=True)
+
+ def get_edge_status(self, edge_id):
+ uri = "%s/%s/status?getlatest=false" % (URI_PREFIX, edge_id)
+ return self.do_request(HTTP_GET, uri, decode=True)
+
+ def get_edges(self):
+ uri = URI_PREFIX
+ return self.do_request(HTTP_GET, uri, decode=True)
+
+ def update_routes(self, edge_id, routes):
+ uri = "%s/%s/routing/config/static" % (URI_PREFIX, edge_id)
+ return self.do_request(HTTP_PUT, uri, routes, format='xml')
+
+ def create_lswitch(self, lsconfig):
+ uri = "/api/ws.v1/lswitch"
+ return self.do_request(HTTP_POST, uri, lsconfig, decode=True)
+
+ def delete_lswitch(self, lswitch_id):
+ uri = "/api/ws.v1/lswitch/%s" % lswitch_id
+ return self.do_request(HTTP_DELETE, uri)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: linb, VMware
+
+from oslo.config import cfg
+
+from neutron.plugins.nicira.common import config # noqa
+from neutron.plugins.nicira.vshield import edge_appliance_driver
+from neutron.plugins.nicira.vshield.tasks import tasks
+from neutron.plugins.nicira.vshield import vcns
+
+
+class VcnsDriver(edge_appliance_driver.EdgeApplianceDriver):
+ def __init__(self, callbacks):
+ super(VcnsDriver, self).__init__()
+
+ self.callbacks = callbacks
+ self.vcns_uri = cfg.CONF.vcns.manager_uri
+ self.vcns_user = cfg.CONF.vcns.user
+ self.vcns_passwd = cfg.CONF.vcns.password
+ self.datacenter_moid = cfg.CONF.vcns.datacenter_moid
+ self.deployment_container_id = cfg.CONF.vcns.deployment_container_id
+ self.resource_pool_id = cfg.CONF.vcns.resource_pool_id
+ self.datastore_id = cfg.CONF.vcns.datastore_id
+ self.external_network = cfg.CONF.vcns.external_network
+ interval = cfg.CONF.vcns.task_status_check_interval
+ self.task_manager = tasks.TaskManager(interval)
+ self.task_manager.start()
+
+ self.vcns = vcns.Vcns(self.vcns_uri, self.vcns_user, self.vcns_passwd)
import os
import neutron.plugins.nicira.api_client.client_eventlet as client
+from neutron.plugins.nicira import extensions
import neutron.plugins.nicira.NeutronPlugin as plugin
import neutron.plugins.nicira.NvpApiClient as nvpapi
+from neutron.plugins.nicira.vshield import vcns
nvp_plugin = plugin.NvpPluginV2
api_helper = nvpapi.NVPApiHelper
nvp_client = client.NvpApiClientEventlet
+vcns_class = vcns.Vcns
STUBS_PATH = os.path.join(os.path.dirname(__file__), 'etc')
-NVPEXT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)),
- "../../plugins/nicira/extensions")
+NVPEXT_PATH = os.path.dirname(extensions.__file__)
NVPAPI_NAME = '%s.%s' % (api_helper.__module__, api_helper.__name__)
PLUGIN_NAME = '%s.%s' % (nvp_plugin.__module__, nvp_plugin.__name__)
CLIENT_NAME = '%s.%s' % (nvp_client.__module__, nvp_client.__name__)
+VCNS_NAME = '%s.%s' % (vcns_class.__module__, vcns_class.__name__)
def get_fake_conf(filename):
--- /dev/null
+[vcns]
+manager_uri = https://fake-host
+user = fake-user
+passwordd = fake-password
+datacenter_moid = fake-moid
+resource_pool_id = fake-resgroup
+datastore_id = fake-datastore
+external_network = fake-ext-net
+task_status_check_interval = 100
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from eventlet import greenthread
+import mock
+
+from neutron.common import config as n_config
+from neutron.plugins.nicira.vshield.common import (
+ constants as vcns_const)
+from neutron.plugins.nicira.vshield.common.constants import RouterStatus
+from neutron.plugins.nicira.vshield.tasks.constants import TaskState
+from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
+from neutron.plugins.nicira.vshield.tasks import tasks as ts
+from neutron.plugins.nicira.vshield import vcns_driver
+from neutron.tests import base
+from neutron.tests.unit.nicira import get_fake_conf
+from neutron.tests.unit.nicira import VCNS_NAME
+from neutron.tests.unit.nicira.vshield import fake_vcns
+
+VCNS_CONFIG_FILE = get_fake_conf("vcns.ini.test")
+
+ts.TaskManager.set_default_interval(100)
+
+
+class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ super(VcnsDriverTaskManagerTestCase, self).setUp()
+ self.manager = ts.TaskManager()
+ self.manager.start(100)
+
+ def tearDown(self):
+ self.manager.stop()
+ super(VcnsDriverTaskManagerTestCase, self).tearDown()
+
+ def _test_task_manager_task_process_state(self, sync_exec=False):
+ def _task_failed(task, reason):
+ task.userdata['result'] = False
+ task.userdata['error'] = reason
+
+ def _check_state(task, exp_state):
+ if not task.userdata.get('result', True):
+ return False
+
+ state = task.userdata['state']
+ if state != exp_state:
+ msg = "state %d expect %d" % (
+ state, exp_state)
+ _task_failed(task, msg)
+ return False
+
+ task.userdata['state'] = state + 1
+ return True
+
+ def _exec(task):
+ if not _check_state(task, 1):
+ return TaskStatus.ERROR
+
+ if task.userdata['sync_exec']:
+ return TaskStatus.COMPLETED
+ else:
+ return TaskStatus.PENDING
+
+ def _status(task):
+ if task.userdata['sync_exec']:
+ _task_failed(task, "_status callback triggered")
+
+ state = task.userdata['state']
+ if state == 3:
+ _check_state(task, 3)
+ return TaskStatus.PENDING
+ else:
+ _check_state(task, 4)
+ return TaskStatus.COMPLETED
+
+ def _result(task):
+ if task.userdata['sync_exec']:
+ exp_state = 3
+ else:
+ exp_state = 5
+
+ _check_state(task, exp_state)
+
+ def _start_monitor(task):
+ _check_state(task, 0)
+
+ def _executed_monitor(task):
+ _check_state(task, 2)
+
+ def _result_monitor(task):
+ if task.userdata['sync_exec']:
+ exp_state = 4
+ else:
+ exp_state = 6
+
+ if _check_state(task, exp_state):
+ task.userdata['result'] = True
+ else:
+ task.userdata['result'] = False
+
+ userdata = {
+ 'state': 0,
+ 'sync_exec': sync_exec
+ }
+ task = ts.Task('name', 'res', _exec, _status, _result, userdata)
+ task.add_start_monitor(_start_monitor)
+ task.add_executed_monitor(_executed_monitor)
+ task.add_result_monitor(_result_monitor)
+
+ self.manager.add(task)
+
+ task.wait(TaskState.RESULT)
+
+ if 'error' in userdata:
+ print userdata['error']
+
+ self.assertTrue(userdata['result'])
+
+ def test_task_manager_task_sync_exec_process_state(self):
+ self._test_task_manager_task_process_state(sync_exec=True)
+
+ def test_task_manager_task_async_exec_process_state(self):
+ self._test_task_manager_task_process_state(sync_exec=False)
+
+ def test_task_manager_task_ordered_process(self):
+ def _task_failed(task, reason):
+ task.userdata['result'] = False
+ task.userdata['error'] = reason
+
+ def _exec(task):
+ task.userdata['executed'] = True
+ return TaskStatus.PENDING
+
+ def _status(task):
+ return TaskStatus.COMPLETED
+
+ def _result(task):
+ next_task = task.userdata.get('next')
+ if next_task:
+ if next_task.userdata.get('executed'):
+ _task_failed(next_task, "executed premature")
+ if task.userdata.get('result', True):
+ task.userdata['result'] = True
+
+ tasks = []
+ prev = None
+ last_task = None
+ for i in range(5):
+ name = "name-%d" % i
+ task = ts.Task(name, 'res', _exec, _status, _result, {})
+ tasks.append(task)
+ if prev:
+ prev.userdata['next'] = task
+ prev = task
+ last_task = task
+
+ for task in tasks:
+ self.manager.add(task)
+
+ last_task.wait(TaskState.RESULT)
+
+ for task in tasks:
+ if 'error' in task.userdata:
+ print "Task %s failed: " % (
+ tasks.name, tasks.userdata['error'])
+
+ for task in tasks:
+ self.assertTrue(task.userdata['result'])
+
+ def test_task_manager_task_parallel_process(self):
+ tasks = []
+
+ def _exec(task):
+ task.userdata['executed'] = True
+ return TaskStatus.PENDING
+
+ def _status(task):
+ for t in tasks:
+ if not t.userdata.get('executed'):
+ t.userdata['resut'] = False
+ return TaskStatus.COMPLETED
+
+ def _result(task):
+ if (task.userdata.get('result') is None and
+ task.status == TaskStatus.COMPLETED):
+ task.userdata['result'] = True
+ else:
+ task.userdata['result'] = False
+
+ for i in range(5):
+ name = "name-%d" % i
+ res = 'resource-%d' % i
+ task = ts.Task(name, res, _exec, _status, _result, {})
+ tasks.append(task)
+ self.manager.add(task)
+
+ for task in tasks:
+ task.wait(TaskState.RESULT)
+ self.assertTrue(task.userdata['result'])
+
+ def test_task_manager_stop(self):
+ def _exec(task):
+ return TaskStatus.PENDING
+
+ def _status(task):
+ greenthread.sleep(0.1)
+ return TaskStatus.PENDING
+
+ def _result(task):
+ pass
+
+ manager = ts.TaskManager().start(100)
+
+ alltasks = {}
+ for i in range(100):
+ res = 'res-%d' % i
+ tasks = []
+ for i in range(100):
+ task = ts.Task('name', res, _exec, _status, _result)
+ manager.add(task)
+ tasks.append(task)
+ alltasks[res] = tasks
+
+ greenthread.sleep(2)
+ manager.stop()
+ for res, tasks in alltasks.iteritems():
+ for task in tasks:
+ self.assertEqual(task.status, TaskStatus.ABORT)
+
+
+class VcnsDriverTestCase(base.BaseTestCase):
+
+ def vcns_patch(self):
+ instance = self.mock_vcns.start()
+ instance.return_value.deploy_edge.side_effect = self.fc.deploy_edge
+ instance.return_value.get_edge_id.side_effect = self.fc.get_edge_id
+ instance.return_value.get_edge_deploy_status.side_effect = (
+ self.fc.get_edge_deploy_status)
+ instance.return_value.delete_edge.side_effect = self.fc.delete_edge
+ instance.return_value.update_interface.side_effect = (
+ self.fc.update_interface)
+ instance.return_value.get_nat_config.side_effect = (
+ self.fc.get_nat_config)
+ instance.return_value.update_nat_config.side_effect = (
+ self.fc.update_nat_config)
+ instance.return_value.delete_nat_rule.side_effect = (
+ self.fc.delete_nat_rule)
+ instance.return_value.get_edge_status.side_effect = (
+ self.fc.get_edge_status)
+ instance.return_value.get_edges.side_effect = self.fc.get_edges
+ instance.return_value.update_routes.side_effect = (
+ self.fc.update_routes)
+ instance.return_value.create_lswitch.side_effect = (
+ self.fc.create_lswitch)
+ instance.return_value.delete_lswitch.side_effect = (
+ self.fc.delete_lswitch)
+
+ def setUp(self):
+ super(VcnsDriverTestCase, self).setUp()
+
+ n_config.parse(['--config-file', VCNS_CONFIG_FILE])
+
+ self.fc = fake_vcns.FakeVcns()
+ self.mock_vcns = mock.patch(VCNS_NAME, autospec=True)
+ self.vcns_patch()
+
+ self.addCleanup(self.fc.reset_all)
+ self.addCleanup(self.mock_vcns.stop)
+
+ self.vcns_driver = vcns_driver.VcnsDriver(self)
+
+ self.edge_id = None
+ self.result = None
+
+ def _deploy_edge(self):
+ task = self.vcns_driver.deploy_edge(
+ 'router-id', 'myedge', 'internal-network', {}, wait_for_exec=True)
+ self.assertEqual(self.edge_id, 'edge-1')
+ task.wait(TaskState.RESULT)
+ return task
+
+ def edge_deploy_started(self, task):
+ self.edge_id = task.userdata['edge_id']
+
+ def edge_deploy_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['edge_deploy_result'] = True
+
+ def edge_delete_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['edge_delete_result'] = True
+
+ def snat_create_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['snat_create_result'] = True
+
+ def snat_delete_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['snat_delete_result'] = True
+
+ def dnat_create_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['dnat_create_result'] = True
+
+ def dnat_delete_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['dnat_delete_result'] = True
+
+ def nat_update_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['nat_update_result'] = True
+
+ def routes_update_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['routes_update_result'] = True
+
+ def interface_update_result(self, task):
+ if task.status == TaskStatus.COMPLETED:
+ task.userdata['jobdata']['interface_update_result'] = True
+
+ def test_deploy_edge(self):
+ jobdata = {}
+ task = self.vcns_driver.deploy_edge(
+ 'router-id', 'myedge', 'internal-network', jobdata=jobdata,
+ wait_for_exec=True)
+ self.assertEqual(self.edge_id, 'edge-1')
+ task.wait(TaskState.RESULT)
+ self.assertEqual(task.status, TaskStatus.COMPLETED)
+ self.assertTrue(jobdata.get('edge_deploy_result'))
+
+ def test_deploy_edge_fail(self):
+ self.vcns_driver.deploy_edge(
+ 'router-1', 'myedge', 'internal-network', {}, wait_for_exec=True)
+ task = self.vcns_driver.deploy_edge(
+ 'router-2', 'myedge', 'internal-network', {}, wait_for_exec=True)
+ task.wait(TaskState.RESULT)
+ self.assertEqual(task.status, TaskStatus.ERROR)
+
+ def test_get_edge_status(self):
+ self._deploy_edge()
+ status = self.vcns_driver.get_edge_status(self.edge_id)
+ self.assertEqual(status, RouterStatus.ROUTER_STATUS_ACTIVE)
+
+ def test_get_edges(self):
+ self._deploy_edge()
+ edges = self.vcns_driver.get_edges_statuses()
+ found = False
+ for edge_id, status in edges.iteritems():
+ if edge_id == self.edge_id:
+ found = True
+ break
+ self.assertTrue(found)
+
+ def _create_nat_rule(self, edge_id, action, org, translated):
+ jobdata = {}
+ if action == 'snat':
+ task = self.vcns_driver.create_snat_rule(
+ 'router-id', edge_id, org, translated, jobdata=jobdata)
+ key = 'snat_create_result'
+ else:
+ task = self.vcns_driver.create_dnat_rule(
+ 'router-id', edge_id, org, translated, jobdata=jobdata)
+ key = 'dnat_create_result'
+ task.wait(TaskState.RESULT)
+ self.assertTrue(jobdata.get(key))
+
+ def _delete_nat_rule(self, edge_id, action, addr):
+ jobdata = {}
+ if action == 'snat':
+ task = self.vcns_driver.delete_snat_rule(
+ 'router-id', edge_id, addr, jobdata=jobdata)
+ key = 'snat_delete_result'
+ else:
+ task = self.vcns_driver.delete_dnat_rule(
+ 'router-id', edge_id, addr, jobdata=jobdata)
+ key = 'dnat_delete_result'
+ task.wait(TaskState.RESULT)
+ self.assertTrue(jobdata.get(key))
+
+ def _test_create_nat_rule(self, action):
+ self._deploy_edge()
+ addr = '192.168.1.1'
+ translated = '10.0.0.1'
+ self._create_nat_rule(self.edge_id, action, addr, translated)
+
+ natcfg = self.vcns_driver.get_nat_config(self.edge_id)
+ for rule in natcfg['rules']['natRulesDtos']:
+ if (rule['originalAddress'] == addr and
+ rule['translatedAddress'] == translated and
+ rule['action'] == action):
+ break
+ else:
+ self.assertTrue(False)
+
+ def _test_delete_nat_rule(self, action):
+ self._deploy_edge()
+ addr = '192.168.1.1'
+ translated = '10.0.0.1'
+ self._create_nat_rule(self.edge_id, action, addr, translated)
+ if action == 'snat':
+ self._delete_nat_rule(self.edge_id, action, addr)
+ else:
+ self._delete_nat_rule(self.edge_id, action, translated)
+ natcfg = self.vcns_driver.get_nat_config(self.edge_id)
+ for rule in natcfg['rules']['natRulesDtos']:
+ if (rule['originalAddress'] == addr and
+ rule['translatedAddress'] == translated and
+ rule['action'] == action):
+ self.assertTrue(False)
+ break
+
+ def test_create_snat_rule(self):
+ self._test_create_nat_rule('snat')
+
+ def test_delete_snat_rule(self):
+ self._test_delete_nat_rule('snat')
+
+ def test_create_dnat_rule(self):
+ self._test_create_nat_rule('dnat')
+
+ def test_delete_dnat_rule(self):
+ self._test_delete_nat_rule('dnat')
+
+ def test_update_nat_rules(self):
+ self._deploy_edge()
+ jobdata = {}
+ snats = [{
+ 'src': '192.168.1.0/24',
+ 'translated': '10.0.0.1'
+ }, {
+ 'src': '192.168.2.0/24',
+ 'translated': '10.0.0.2'
+ }, {
+ 'src': '192.168.3.0/24',
+ 'translated': '10.0.0.3'
+ }
+ ]
+ dnats = [{
+ 'dst': '100.0.0.4',
+ 'translated': '192.168.1.1'
+ }, {
+ 'dst': '100.0.0.5',
+ 'translated': '192.168.2.1'
+ }
+ ]
+ task = self.vcns_driver.update_nat_rules(
+ 'router-id', self.edge_id, snats, dnats, jobdata=jobdata)
+ task.wait(TaskState.RESULT)
+ self.assertTrue(jobdata.get('nat_update_result'))
+
+ natcfg = self.vcns_driver.get_nat_config(self.edge_id)
+ rules = natcfg['rules']['natRulesDtos']
+ self.assertEqual(len(rules), 2 * len(dnats) + len(snats))
+ self.natEquals(rules[0], dnats[0])
+ self.natEquals(rules[1], self.snat_for_dnat(dnats[0]))
+ self.natEquals(rules[2], dnats[1])
+ self.natEquals(rules[3], self.snat_for_dnat(dnats[1]))
+ self.natEquals(rules[4], snats[0])
+ self.natEquals(rules[5], snats[1])
+ self.natEquals(rules[6], snats[2])
+
+ def snat_for_dnat(self, dnat):
+ return {
+ 'src': dnat['translated'],
+ 'translated': dnat['dst']
+ }
+
+ def natEquals(self, rule, exp):
+ addr = exp.get('src')
+ if not addr:
+ addr = exp.get('dst')
+
+ self.assertEqual(rule['originalAddress'], addr)
+ self.assertEqual(rule['translatedAddress'], exp['translated'])
+
+ def test_update_routes(self):
+ self._deploy_edge()
+ jobdata = {}
+ routes = [{
+ 'cidr': '192.168.1.0/24',
+ 'nexthop': '169.254.2.1'
+ }, {
+ 'cidr': '192.168.2.0/24',
+ 'nexthop': '169.254.2.1'
+ }, {
+ 'cidr': '192.168.3.0/24',
+ 'nexthop': '169.254.2.1'
+ }
+ ]
+ task = self.vcns_driver.update_routes(
+ 'router-id', self.edge_id, '10.0.0.1', routes, jobdata=jobdata)
+ task.wait(TaskState.RESULT)
+ self.assertTrue(jobdata.get('routes_update_result'))
+
+ def test_update_interface(self):
+ self._deploy_edge()
+ jobdata = {}
+ task = self.vcns_driver.update_interface(
+ 'router-id', self.edge_id, vcns_const.EXTERNAL_VNIC_INDEX,
+ 'network-id', address='100.0.0.3', netmask='255.255.255.0',
+ jobdata=jobdata)
+ task.wait(TaskState.RESULT)
+ self.assertTrue(jobdata.get('interface_update_result'))
+
+ def test_delete_edge(self):
+ self._deploy_edge()
+ jobdata = {}
+ task = self.vcns_driver.delete_edge(
+ 'router-id', self.edge_id, jobdata=jobdata)
+ task.wait(TaskState.RESULT)
+ self.assertTrue(jobdata.get('edge_delete_result'))
+
+ def test_create_lswitch(self):
+ tz_config = [{
+ 'transport_zone_uuid': 'tz-uuid'
+ }]
+ lswitch = self.vcns_driver.create_lswitch('lswitch', tz_config)
+ self.assertEqual(lswitch['display_name'], 'lswitch')
+ self.assertEqual(lswitch['type'], 'LogicalSwitchConfig')
+ self.assertIn('uuid', lswitch)
+
+ def test_delete_lswitch(self):
+ tz_config = {
+ 'transport_zone_uuid': 'tz-uuid'
+ }
+ lswitch = self.vcns_driver.create_lswitch('lswitch', tz_config)
+ self.vcns_driver.delete_lswitch(lswitch['uuid'])
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: linb, VMware
+
+import copy
+import json
+
+from neutron.openstack.common import uuidutils
+
+
+class FakeVcns(object):
+
+ def __init__(self, unique_router_name=True):
+ self._jobs = {}
+ self._job_idx = 0
+ self._edges = {}
+ self._edge_idx = 0
+ self._lswitches = {}
+ self._unique_router_name = unique_router_name
+ self._fake_nvpapi = None
+
+ def set_fake_nvpapi(self, fake_nvpapi):
+ self._fake_nvpapi = fake_nvpapi
+
+ def _validate_edge_name(self, name):
+ for edge_id, edge in self._edges.iteritems():
+ if edge['name'] == name:
+ return False
+ return True
+
+ def deploy_edge(self, request):
+ if (self._unique_router_name and
+ not self._validate_edge_name(request['name'])):
+ header = {
+ 'status': 400
+ }
+ msg = ('Edge name should be unique for tenant. Edge %s '
+ 'already exists for default tenant.') % request['name']
+ response = {
+ 'details': msg,
+ 'errorCode': 10085,
+ 'rootCauseString': None,
+ 'moduleName': 'vShield Edge',
+ 'errorData': None
+ }
+ return (header, json.dumps(response))
+
+ self._job_idx = self._job_idx + 1
+ job_id = "jobdata-%d" % self._job_idx
+ self._edge_idx = self._edge_idx + 1
+ edge_id = "edge-%d" % self._edge_idx
+ self._jobs[job_id] = edge_id
+ self._edges[edge_id] = {
+ 'name': request['name'],
+ 'request': request,
+ 'nat_rules': None,
+ 'nat_rule_id': 0
+ }
+ header = {
+ 'status': 200,
+ 'location': 'https://host/api/4.0/jobs/%s' % job_id
+ }
+ response = ''
+ return (header, response)
+
+ def get_edge_id(self, job_id):
+ if job_id not in self._jobs:
+ raise Exception(_("Job %s does not nexist") % job_id)
+
+ header = {
+ 'status': 200
+ }
+ response = {
+ 'edgeId': self._jobs[job_id]
+ }
+ return (header, response)
+
+ def get_edge_deploy_status(self, edge_id):
+ if edge_id not in self._edges:
+ raise Exception(_("Edge %s does not exist") % edge_id)
+ header = {
+ 'status': 200,
+ }
+ response = {
+ 'systemStatus': 'good'
+ }
+ return (header, response)
+
+ def delete_edge(self, edge_id):
+ if edge_id not in self._edges:
+ raise Exception(_("Edge %s does not exist") % edge_id)
+ del self._edges[edge_id]
+ header = {
+ 'status': 200
+ }
+ response = ''
+ return (header, response)
+
+ def update_interface(self, edge_id, vnic):
+ header = {
+ 'status': 200
+ }
+ response = ''
+ return (header, response)
+
+ def get_nat_config(self, edge_id):
+ if edge_id not in self._edges:
+ raise Exception(_("Edge %s does not exist") % edge_id)
+ edge = self._edges[edge_id]
+ rules = edge['nat_rules']
+ if rules is None:
+ rules = {
+ 'rules': {
+ 'natRulesDtos': []
+ },
+ 'version': 1
+ }
+ header = {
+ 'status': 200
+ }
+ rules['version'] = 1
+ return (header, rules)
+
+ def update_nat_config(self, edge_id, nat):
+ if edge_id not in self._edges:
+ raise Exception(_("Edge %s does not exist") % edge_id)
+ edge = self._edges[edge_id]
+ max_rule_id = edge['nat_rule_id']
+ rules = copy.deepcopy(nat)
+ for rule in rules['rules']['natRulesDtos']:
+ rule_id = rule.get('ruleId', 0)
+ if rule_id > max_rule_id:
+ max_rule_id = rule_id
+ for rule in rules['rules']['natRulesDtos']:
+ if 'ruleId' not in rule:
+ max_rule_id = max_rule_id + 1
+ rule['ruleId'] = max_rule_id
+ edge['nat_rules'] = rules
+ edge['nat_rule_id'] = max_rule_id
+ header = {
+ 'status': 200
+ }
+ response = ''
+ return (header, response)
+
+ def delete_nat_rule(self, edge_id, rule_id):
+ if edge_id not in self._edges:
+ raise Exception(_("Edge %s does not exist") % edge_id)
+
+ edge = self._edges[edge_id]
+ rules = edge['nat_rules']
+ rule_to_delete = None
+ for rule in rules['rules']['natRulesDtos']:
+ if rule_id == rule['ruleId']:
+ rule_to_delete = rule
+ break
+ if rule_to_delete is None:
+ raise Exception(_("Rule id %d doest not exist") % rule_id)
+
+ rules['rules']['natRulesDtos'].remove(rule_to_delete)
+
+ header = {
+ 'status': 200
+ }
+ response = ''
+ return (header, response)
+
+ def get_edge_status(self, edge_id):
+ if edge_id not in self._edges:
+ raise Exception(_("Edge %s does not exist") % edge_id)
+
+ header = {
+ 'status': 200
+ }
+ response = {
+ 'edgeStatus': 'GREEN'
+ }
+ return (header, response)
+
+ def get_edges(self):
+ header = {
+ 'status': 200
+ }
+ edges = []
+ for edge_id in self._edges:
+ edges.append({
+ 'id': edge_id,
+ 'edgeStatus': 'GREEN'
+ })
+ response = {
+ 'edgePage': {
+ 'data': edges
+ }
+ }
+ return (header, response)
+
+ def update_routes(self, edge_id, routes):
+ header = {
+ 'status': 200
+ }
+ response = ''
+ return (header, response)
+
+ def create_lswitch(self, lsconfig):
+ # The lswitch is created via VCNS API so the fake nvpapi wont
+ # see it. Added to fake nvpapi here.
+ if self._fake_nvpapi:
+ lswitch = self._fake_nvpapi._add_lswitch(json.dumps(lsconfig))
+ else:
+ lswitch = lsconfig
+ lswitch['uuid'] = uuidutils.generate_uuid()
+ self._lswitches[lswitch['uuid']] = lswitch
+ header = {
+ 'status': 200
+ }
+ lswitch['_href'] = '/api/ws.v1/lswitch/%s' % lswitch['uuid']
+ return (header, lswitch)
+
+ def delete_lswitch(self, id):
+ if id not in self._lswitches:
+ raise Exception(_("Lswitch %s does not exist") % id)
+ del self._lswitches[id]
+ if self._fake_nvpapi:
+ # TODO(fank): fix the hack
+ del self._fake_nvpapi._fake_lswitch_dict[id]
+ header = {
+ 'status': 200
+ }
+ response = ''
+ return (header, response)
+
+ def reset_all(self):
+ self._jobs.clear()
+ self._edges.clear()
+ self._lswitches.clear()