]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
VCNS driver implementation
authorKaiwei Fan <fank@vmware.com>
Tue, 20 Aug 2013 21:28:58 +0000 (14:28 -0700)
committerKaiwei Fan <fank@vmware.com>
Thu, 5 Sep 2013 00:16:41 +0000 (17:16 -0700)
Implement API/driver interface for configuring vShield Edge Appliance.

Currently implemented functions:
    - Deploy an Edge
    - Destroy an Edge
    - Configuring interfaces
    - Configuring SNAT/DNAT rules
    - Configuring default gateway and static routes
    - Query Edge status
    - Task-based asynchronous model
    - Allow old routes/nat config to be skipped if new updates are coming

Implements: blueprint vcns-driver
Change-Id: I881bde907f4c90de4c919d008b76b8c2a2d0e1fd

19 files changed:
etc/neutron/plugins/nicira/nvp.ini
neutron/plugins/nicira/common/config.py
neutron/plugins/nicira/vshield/__init__.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/common/VcnsApiClient.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/common/__init__.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/common/constants.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/common/exceptions.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/edge_appliance_driver.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/tasks/__init__.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/tasks/constants.py [new file with mode: 0755]
neutron/plugins/nicira/vshield/tasks/tasks.py [new file with mode: 0755]
neutron/plugins/nicira/vshield/vcns.py [new file with mode: 0644]
neutron/plugins/nicira/vshield/vcns_driver.py [new file with mode: 0644]
neutron/tests/unit/nicira/__init__.py
neutron/tests/unit/nicira/etc/vcns.ini.test [new file with mode: 0644]
neutron/tests/unit/nicira/test_vcns_driver.py [new file with mode: 0644]
neutron/tests/unit/nicira/vshield/__init__.py [new file with mode: 0644]
neutron/tests/unit/nicira/vshield/common/__init__.py [new file with mode: 0644]
neutron/tests/unit/nicira/vshield/fake_vcns.py [new file with mode: 0644]

index a5e0059b9db3a0ca6bbfc634b8073921667e3d9b..c810423553314a9533ad6f7fe3f59ae85f8f1bcd 100644 (file)
 # that using the minimum chunk size will cause the interval between two
 # requests to be less than min_sync_req_delay
 # min_chunk_size = 500
+
+[vcns]
+# URL for VCNS manager
+# manager_uri = https://management_ip
+
+# User name for VCNS manager
+# user = admin
+
+# Password for VCNS manager
+# password = default
+
+# (Optional) Datacenter ID for Edge deployment
+# datacenter_moid =
+
+# (Optional) Deployment Container ID for NSX Edge deployment
+# If not specified, either a default global container will be used, or
+# the resource pool and datastore specified below will be used
+# deployment_container_id =
+
+# (Optional) Resource pool ID for NSX Edge deployment
+# resource_pool_id =
+
+# (Optional) Datastore ID for NSX Edge deployment
+# datastore_id =
+
+# (Required) UUID of logic switch for physical network connectivity
+# external_network =
+
+# (Optional) Asynchronous task status check interval
+# default is 2000 (millisecond)
+# task_status_check_interval = 2000
index 88c9ea34887473256f63be14ddd92ff2d6a19b7a..721294e598f04505b706b873d141e255449ef922 100644 (file)
@@ -120,12 +120,43 @@ cluster_opts = [
                       "network connection")),
 ]
 
+DEFAULT_STATUS_CHECK_INTERVAL = 2000
+
+vcns_opts = [
+    cfg.StrOpt('user',
+               default='admin',
+               help=_('User name for vsm')),
+    cfg.StrOpt('password',
+               default='default',
+               secret=True,
+               help=_('Password for vsm')),
+    cfg.StrOpt('manager_uri',
+               help=_('uri for vsm')),
+    cfg.StrOpt('datacenter_moid',
+               help=_('Optional parameter identifying the ID of datacenter '
+                      'to deploy NSX Edges')),
+    cfg.StrOpt('deployment_container_id',
+               help=_('Optional parameter identifying the ID of datastore to '
+                      'deploy NSX Edges')),
+    cfg.StrOpt('resource_pool_id',
+               help=_('Optional parameter identifying the ID of resource to '
+                      'deploy NSX Edges')),
+    cfg.StrOpt('datastore_id',
+               help=_('Optional parameter identifying the ID of datastore to '
+                      'deploy NSX Edges')),
+    cfg.StrOpt('external_network',
+               help=_('Network ID for physical network connectivity')),
+    cfg.IntOpt('task_status_check_interval',
+               default=DEFAULT_STATUS_CHECK_INTERVAL,
+               help=_("Task status check interval"))
+]
+
 # Register the configuration options
 cfg.CONF.register_opts(connection_opts)
 cfg.CONF.register_opts(cluster_opts)
 cfg.CONF.register_opts(nvp_opts, "NVP")
 cfg.CONF.register_opts(sync_opts, "NVP_SYNC")
-
+cfg.CONF.register_opts(vcns_opts, group="vcns")
 # NOTE(armando-migliaccio): keep the following code until we support
 # NVP configuration files in older format (Grizzly or older).
 # ### BEGIN
diff --git a/neutron/plugins/nicira/vshield/__init__.py b/neutron/plugins/nicira/vshield/__init__.py
new file mode 100644 (file)
index 0000000..c020e3b
--- /dev/null
@@ -0,0 +1,16 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/plugins/nicira/vshield/common/VcnsApiClient.py b/neutron/plugins/nicira/vshield/common/VcnsApiClient.py
new file mode 100644 (file)
index 0000000..55fc7a2
--- /dev/null
@@ -0,0 +1,84 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: linb, VMware
+
+import base64
+
+import eventlet
+
+from neutron.openstack.common import jsonutils
+from neutron.plugins.nicira.vshield.common import exceptions
+
+httplib2 = eventlet.import_patched('httplib2')
+
+
+def xmldumps(obj):
+    config = ""
+    if isinstance(obj, dict):
+        for key, value in obj.iteritems():
+            cfg = "<%s>%s</%s>" % (key, xmldumps(value), key)
+            config += cfg
+    elif isinstance(obj, list):
+        for value in obj:
+            config += xmldumps(value)
+    else:
+        config = obj
+
+    return config
+
+
+class VcnsApiHelper(object):
+    errors = {
+        303: exceptions.ResourceRedirect,
+        400: exceptions.RequestBad,
+        403: exceptions.Forbidden,
+        404: exceptions.ResourceNotFound,
+        415: exceptions.MediaTypeUnsupport,
+        503: exceptions.ServiceUnavailable
+    }
+
+    def __init__(self, address, user, password, format='json'):
+        self.authToken = base64.encodestring("%s:%s" % (user, password))
+        self.user = user
+        self.passwd = password
+        self.address = address
+        self.format = format
+        if format == 'json':
+            self.encode = jsonutils.dumps
+        else:
+            self.encode = xmldumps
+
+    def request(self, method, uri, params=None):
+        uri = self.address + uri
+        http = httplib2.Http()
+        http.disable_ssl_certificate_validation = True
+        headers = {
+            'Content-Type': 'application/' + self.format,
+            'Accept': 'application/' + 'json',
+            'Authorization': 'Basic ' + self.authToken
+        }
+        body = self.encode(params) if params else None
+        header, response = http.request(uri, method,
+                                        body=body, headers=headers)
+        status = int(header['status'])
+        if 200 <= status < 300:
+            return header, response
+        if status in self.errors:
+            cls = self.errors[status]
+        else:
+            cls = exceptions.VcnsApiException
+        raise cls(uri=uri, status=status, header=header, response=response)
diff --git a/neutron/plugins/nicira/vshield/common/__init__.py b/neutron/plugins/nicira/vshield/common/__init__.py
new file mode 100644 (file)
index 0000000..5e8da71
--- /dev/null
@@ -0,0 +1,16 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/plugins/nicira/vshield/common/constants.py b/neutron/plugins/nicira/vshield/common/constants.py
new file mode 100644 (file)
index 0000000..6edbd35
--- /dev/null
@@ -0,0 +1,45 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+EDGE_ID = 'edge_id'
+ROUTER_ID = 'router_id'
+
+# Interface
+EXTERNAL_VNIC_INDEX = 0
+INTERNAL_VNIC_INDEX = 1
+EXTERNAL_VNIC_NAME = "external"
+INTERNAL_VNIC_NAME = "internal"
+
+INTEGRATION_LR_IPADDRESS = "169.254.2.1/28"
+INTEGRATION_EDGE_IPADDRESS = "169.254.2.3"
+INTEGRATION_SUBNET_NETMASK = "255.255.255.240"
+
+# SNAT rule location
+PREPEND = 0
+APPEND = -1
+
+# error code
+VCNS_ERROR_CODE_EDGE_NOT_RUNNING = 10013
+
+
+# router status by number
+class RouterStatus(object):
+    ROUTER_STATUS_ACTIVE = 0
+    ROUTER_STATUS_DOWN = 1
+    ROUTER_STATUS_PENDING_CREATE = 2
+    ROUTER_STATUS_PENDING_DELETE = 3
+    ROUTER_STATUS_ERROR = 4
diff --git a/neutron/plugins/nicira/vshield/common/exceptions.py b/neutron/plugins/nicira/vshield/common/exceptions.py
new file mode 100644 (file)
index 0000000..2e4b210
--- /dev/null
@@ -0,0 +1,64 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: linb, VMware
+
+from neutron.common import exceptions
+
+
+class VcnsException(exceptions.NeutronException):
+    pass
+
+
+class VcnsGeneralException(VcnsException):
+    def __init__(self, message):
+        self.message = message
+        super(VcnsGeneralException, self).__init__()
+
+
+class VcnsApiException(VcnsException):
+    message = _("An unknown exception %(status)s occurred: %(response)s.")
+
+    def __init__(self, **kwargs):
+        super(VcnsApiException, self).__init__(**kwargs)
+
+        self.status = kwargs.get('status')
+        self.header = kwargs.get('header')
+        self.response = kwargs.get('response')
+
+
+class ResourceRedirect(VcnsApiException):
+    message = _("Resource %(uri)s has been redirected")
+
+
+class RequestBad(VcnsApiException):
+    message = _("Request %(uri)s is Bad, response %(response)s")
+
+
+class Forbidden(VcnsApiException):
+    message = _("Forbidden: %(uri)s")
+
+
+class ResourceNotFound(VcnsApiException):
+    message = _("Resource %(uri)s not found")
+
+
+class MediaTypeUnsupport(VcnsApiException):
+    message = _("Media Type %(uri)s is not supported")
+
+
+class ServiceUnavailable(VcnsApiException):
+    message = _("Service on available: %(uri)s")
diff --git a/neutron/plugins/nicira/vshield/edge_appliance_driver.py b/neutron/plugins/nicira/vshield/edge_appliance_driver.py
new file mode 100644 (file)
index 0000000..026744a
--- /dev/null
@@ -0,0 +1,631 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Kaiwei Fan, VMware, Inc.
+# @author: Bo Link, VMware, Inc.
+
+from neutron.openstack.common import jsonutils
+from neutron.openstack.common import log as logging
+from neutron.plugins.nicira.vshield.common import (
+    constants as vcns_const)
+from neutron.plugins.nicira.vshield.common.constants import RouterStatus
+from neutron.plugins.nicira.vshield.common import exceptions
+from neutron.plugins.nicira.vshield.tasks.constants import TaskState
+from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
+from neutron.plugins.nicira.vshield.tasks import tasks
+
+LOG = logging.getLogger(__name__)
+
+
+class EdgeApplianceDriver(object):
+    def __init__(self):
+        # store the last task per edge that has the latest config
+        self.updated_task = {
+            'nat': {},
+            'route': {},
+        }
+
+    def _assemble_edge(self, name, appliance_size="compact",
+                       deployment_container_id=None, datacenter_moid=None,
+                       enable_aesni=True, hypervisor_assist=False,
+                       enable_fips=False, remote_access=False):
+        edge = {
+            'name': name,
+            'fqdn': name,
+            'hypervisorAssist': hypervisor_assist,
+            'type': 'gatewayServices',
+            'enableAesni': enable_aesni,
+            'enableFips': enable_fips,
+            'cliSettings': {
+                'remoteAccess': remote_access
+            },
+            'appliances': {
+                'applianceSize': appliance_size
+            },
+            'vnics': {
+                'vnics': []
+            }
+        }
+        if deployment_container_id:
+            edge['appliances']['deploymentContainerId'] = (
+                deployment_container_id)
+        if datacenter_moid:
+            edge['datacenterMoid'] = datacenter_moid,
+
+        return edge
+
+    def _assemble_edge_appliance(self, resource_pool_id, datastore_id):
+        appliance = {}
+        if resource_pool_id:
+            appliance['resourcePoolId'] = resource_pool_id
+        if datastore_id:
+            appliance['datastoreId'] = datastore_id
+        return appliance
+
+    def _assemble_edge_vnic(self, name, index, portgroup_id,
+                            primary_address=None, subnet_mask=None,
+                            secondary=None,
+                            type="internal",
+                            enable_proxy_arp=False,
+                            enable_send_redirects=True,
+                            is_connected=True,
+                            mtu=1500):
+        vnic = {
+            'index': index,
+            'name': name,
+            'type': type,
+            'portgroupId': portgroup_id,
+            'mtu': mtu,
+            'enableProxyArp': enable_proxy_arp,
+            'enableSendRedirects': enable_send_redirects,
+            'isConnected': is_connected
+        }
+        if primary_address and subnet_mask:
+            address_group = {
+                'primaryAddress': primary_address,
+                'subnetMask': subnet_mask
+            }
+            if secondary:
+                address_group['secondaryAddresses'] = {
+                    'ipAddress': secondary
+                }
+
+            vnic['addressGroups'] = {
+                'addressGroups': [address_group]
+            }
+
+        return vnic
+
+    def _edge_status_to_level(self, status):
+        if status == 'GREEN':
+            status_level = RouterStatus.ROUTER_STATUS_ACTIVE
+        elif status in ('GREY', 'YELLOW'):
+            status_level = RouterStatus.ROUTER_STATUS_DOWN
+        else:
+            status_level = RouterStatus.ROUTER_STATUS_ERROR
+        return status_level
+
+    def get_edge_status(self, edge_id):
+        try:
+            response = self.vcns.get_edge_status(edge_id)[1]
+            status_level = self._edge_status_to_level(
+                response['edgeStatus'])
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Failed to get edge status:\n%s"),
+                          e.response)
+            status_level = RouterStatus.ROUTER_STATUS_ERROR
+            try:
+                desc = jsonutils.loads(e.response)
+                if desc.get('errorCode') == (
+                    vcns_const.VCNS_ERROR_CODE_EDGE_NOT_RUNNING):
+                    status_level = RouterStatus.ROUTER_STATUS_DOWN
+            except ValueError:
+                LOG.exception(e.response)
+
+        return status_level
+
+    def get_edges_statuses(self):
+        edges_status_level = {}
+        edges = self._get_edges()
+        for edge in edges['edgePage'].get('data', []):
+            edge_id = edge['id']
+            status = edge['edgeStatus']
+            edges_status_level[edge_id] = self._edge_status_to_level(status)
+
+        return edges_status_level
+
+    def _update_interface(self, task):
+        edge_id = task.userdata['edge_id']
+        config = task.userdata['config']
+        LOG.debug(_("VCNS: start updating vnic %s"), config)
+        try:
+            self.vcns.update_interface(edge_id, config)
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Failed to update vnic %(config)s:\n"
+                            "%(response)s"), {
+                                'config': config,
+                                'response': e.response})
+            raise e
+        except Exception as e:
+            LOG.exception(_("VCNS: Failed to update vnic %d"),
+                          config['index'])
+            raise e
+
+        return TaskStatus.COMPLETED
+
+    def update_interface(self, router_id, edge_id, index, network,
+                         address=None, netmask=None, secondary=None,
+                         jobdata=None):
+        LOG.debug(_("VCNS: update vnic %(index)d: %(addr)s %(netmask)s"), {
+            'index': index, 'addr': address, 'netmask': netmask})
+        if index == vcns_const.EXTERNAL_VNIC_INDEX:
+            name = vcns_const.EXTERNAL_VNIC_NAME
+            intf_type = 'uplink'
+        elif index == vcns_const.INTERNAL_VNIC_INDEX:
+            name = vcns_const.INTERNAL_VNIC_NAME
+            intf_type = 'internal'
+        else:
+            msg = _("Vnic %d currently not supported") % index
+            raise exceptions.VcnsGeneralException(msg)
+
+        config = self._assemble_edge_vnic(
+            name, index, network, address, netmask, secondary, type=intf_type)
+
+        userdata = {
+            'edge_id': edge_id,
+            'config': config,
+            'jobdata': jobdata
+        }
+        task_name = "update-interface-%s-%d" % (edge_id, index)
+        task = tasks.Task(task_name, router_id,
+                          self._update_interface, userdata=userdata)
+        task.add_result_monitor(self.callbacks.interface_update_result)
+        self.task_manager.add(task)
+        return task
+
+    def _deploy_edge(self, task):
+        userdata = task.userdata
+        name = userdata['router_name']
+        LOG.debug(_("VCNS: start deploying edge %s"), name)
+        request = userdata['request']
+        try:
+            header = self.vcns.deploy_edge(request)[0]
+            objuri = header['location']
+            job_id = objuri[objuri.rfind("/") + 1:]
+            response = self.vcns.get_edge_id(job_id)[1]
+            edge_id = response['edgeId']
+            LOG.debug(_("VCNS: deploying edge %s"), edge_id)
+            userdata['edge_id'] = edge_id
+            status = TaskStatus.PENDING
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: deploy edge failed for router %s."),
+                          name)
+            raise e
+
+        return status
+
+    def _status_edge(self, task):
+        edge_id = task.userdata['edge_id']
+        try:
+            response = self.vcns.get_edge_deploy_status(edge_id)[1]
+            task.userdata['retries'] = 0
+            system_status = response.get('systemStatus', None)
+            if system_status is None:
+                status = TaskStatus.PENDING
+            elif system_status == 'good':
+                status = TaskStatus.COMPLETED
+            else:
+                status = TaskStatus.ERROR
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Edge %s status query failed."), edge_id)
+            raise e
+        except Exception as e:
+            retries = task.userdata.get('retries', 0) + 1
+            if retries < 3:
+                task.userdata['retries'] = retries
+                msg = _("VCNS: Unable to retrieve edge %(edge_id)s status. "
+                        "Retry %(retries)d.") % {
+                            'edge_id': edge_id,
+                            'retries': retries}
+                LOG.exception(msg)
+                status = TaskStatus.PENDING
+            else:
+                msg = _("VCNS: Unable to retrieve edge %s status. "
+                        "Abort.") % edge_id
+                LOG.exception(msg)
+                status = TaskStatus.ERROR
+        LOG.debug(_("VCNS: Edge %s status"), edge_id)
+        return status
+
+    def _result_edge(self, task):
+        router_name = task.userdata['router_name']
+        edge_id = task.userdata.get('edge_id')
+        if task.status != TaskStatus.COMPLETED:
+            LOG.error(_("VCNS: Failed to deploy edge %(edge_id)s "
+                        "for %(name)s, status %(status)d"), {
+                            'edge_id': edge_id,
+                            'name': router_name,
+                            'status': task.status
+                        })
+        else:
+            LOG.debug(_("VCNS: Edge %(edge_id)s deployed for "
+                        "router %(name)s"), {
+                            'edge_id': edge_id, 'name': router_name
+                        })
+
+    def _delete_edge(self, task):
+        edge_id = task.userdata['edge_id']
+        LOG.debug(_("VCNS: start destroying edge %s"), edge_id)
+        status = TaskStatus.COMPLETED
+        if edge_id:
+            try:
+                self.vcns.delete_edge(edge_id)
+            except exceptions.ResourceNotFound:
+                pass
+            except exceptions.VcnsApiException as e:
+                msg = _("VCNS: Failed to delete %{edge_id)s:\n"
+                        "%(response)s") % {
+                            'edge_id': edge_id, 'response': e.response}
+                LOG.exception(msg)
+                status = TaskStatus.ERROR
+            except Exception:
+                LOG.exception(_("VCNS: Failed to delete %s"), edge_id)
+                status = TaskStatus.ERROR
+
+        return status
+
+    def _get_edges(self):
+        try:
+            return self.vcns.get_edges()[1]
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Failed to get edges:\n%s"), e.response)
+            raise e
+
+    def deploy_edge(self, router_id, name, internal_network, jobdata=None,
+                    wait_for_exec=False):
+        task_name = 'deploying-%s' % name
+        edge_name = name
+        edge = self._assemble_edge(
+            edge_name, datacenter_moid=self.datacenter_moid,
+            deployment_container_id=self.deployment_container_id,
+            appliance_size='large', remote_access=True)
+        appliance = self._assemble_edge_appliance(self.resource_pool_id,
+                                                  self.datastore_id)
+        if appliance:
+            edge['appliances']['appliances'] = [appliance]
+
+        vnic_external = self._assemble_edge_vnic(
+            vcns_const.EXTERNAL_VNIC_NAME, vcns_const.EXTERNAL_VNIC_INDEX,
+            self.external_network, type="uplink")
+        edge['vnics']['vnics'].append(vnic_external)
+        vnic_inside = self._assemble_edge_vnic(
+            vcns_const.INTERNAL_VNIC_NAME, vcns_const.INTERNAL_VNIC_INDEX,
+            internal_network,
+            vcns_const.INTEGRATION_EDGE_IPADDRESS,
+            vcns_const.INTEGRATION_SUBNET_NETMASK,
+            type="internal")
+        edge['vnics']['vnics'].append(vnic_inside)
+        userdata = {
+            'request': edge,
+            'router_name': name,
+            'jobdata': jobdata
+        }
+        task = tasks.Task(task_name, router_id,
+                          self._deploy_edge,
+                          status_callback=self._status_edge,
+                          result_callback=self._result_edge,
+                          userdata=userdata)
+        task.add_executed_monitor(self.callbacks.edge_deploy_started)
+        task.add_result_monitor(self.callbacks.edge_deploy_result)
+        self.task_manager.add(task)
+
+        if wait_for_exec:
+            # waitl until the deploy task is executed so edge_id is available
+            task.wait(TaskState.EXECUTED)
+
+        return task
+
+    def delete_edge(self, router_id, edge_id, jobdata=None):
+        task_name = 'delete-%s' % edge_id
+        userdata = {
+            'router_id': router_id,
+            'edge_id': edge_id,
+            'jobdata': jobdata
+        }
+        task = tasks.Task(task_name, router_id, self._delete_edge,
+                          userdata=userdata)
+        task.add_result_monitor(self.callbacks.edge_delete_result)
+        self.task_manager.add(task)
+        return task
+
+    def _assemble_nat_rule(self, action, original_address,
+                           translated_address,
+                           vnic_index=vcns_const.EXTERNAL_VNIC_INDEX,
+                           enabled=True):
+        nat_rule = {}
+        nat_rule['action'] = action
+        nat_rule['vnic'] = vnic_index
+        nat_rule['originalAddress'] = original_address
+        nat_rule['translatedAddress'] = translated_address
+        nat_rule['enabled'] = enabled
+        return nat_rule
+
+    def get_nat_config(self, edge_id):
+        try:
+            return self.vcns.get_nat_config(edge_id)[1]
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Failed to get nat config:\n%s"),
+                          e.response)
+            raise e
+
+    def _create_nat_rule(self, task):
+        # TODO(fank): use POST for optimization
+        #             return rule_id for future reference
+        rule = task.userdata['rule']
+        LOG.debug(_("VCNS: start creating nat rules: %s"), rule)
+        edge_id = task.userdata['edge_id']
+        nat = self.get_nat_config(edge_id)
+        location = task.userdata['location']
+
+        del nat['version']
+
+        if location is None or location == vcns_const.APPEND:
+            nat['rules']['natRulesDtos'].append(rule)
+        else:
+            nat['rules']['natRulesDtos'].insert(location, rule)
+
+        try:
+            self.vcns.update_nat_config(edge_id, nat)
+            status = TaskStatus.COMPLETED
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
+                          e.response)
+            status = TaskStatus.ERROR
+
+        return status
+
+    def create_snat_rule(self, router_id, edge_id, src, translated,
+                         jobdata=None, location=None):
+        LOG.debug(_("VCNS: create snat rule %(src)s/%(translated)s"), {
+            'src': src, 'translated': translated})
+        snat_rule = self._assemble_nat_rule("snat", src, translated)
+        userdata = {
+            'router_id': router_id,
+            'edge_id': edge_id,
+            'rule': snat_rule,
+            'location': location,
+            'jobdata': jobdata
+        }
+        task_name = "create-snat-%s-%s-%s" % (edge_id, src, translated)
+        task = tasks.Task(task_name, router_id, self._create_nat_rule,
+                          userdata=userdata)
+        task.add_result_monitor(self.callbacks.snat_create_result)
+        self.task_manager.add(task)
+        return task
+
+    def _delete_nat_rule(self, task):
+        # TODO(fank): pass in rule_id for optimization
+        #             handle routes update for optimization
+        edge_id = task.userdata['edge_id']
+        address = task.userdata['address']
+        addrtype = task.userdata['addrtype']
+        LOG.debug(_("VCNS: start deleting %(type)s rules: %(addr)s"), {
+            'type': addrtype, 'addr': address})
+        nat = self.get_nat_config(edge_id)
+        del nat['version']
+        status = TaskStatus.COMPLETED
+        for nat_rule in nat['rules']['natRulesDtos']:
+            if nat_rule[addrtype] == address:
+                rule_id = nat_rule['ruleId']
+                try:
+                    self.vcns.delete_nat_rule(edge_id, rule_id)
+                except exceptions.VcnsApiException as e:
+                    LOG.exception(_("VCNS: Failed to delete snat rule:\n"
+                                    "%s"), e.response)
+                    status = TaskStatus.ERROR
+
+        return status
+
+    def delete_snat_rule(self, router_id, edge_id, src, jobdata=None):
+        LOG.debug(_("VCNS: delete snat rule %s"), src)
+        userdata = {
+            'edge_id': edge_id,
+            'address': src,
+            'addrtype': 'originalAddress',
+            'jobdata': jobdata
+        }
+        task_name = "delete-snat-%s-%s" % (edge_id, src)
+        task = tasks.Task(task_name, router_id, self._delete_nat_rule,
+                          userdata=userdata)
+        task.add_result_monitor(self.callbacks.snat_delete_result)
+        self.task_manager.add(task)
+        return task
+
+    def create_dnat_rule(self, router_id, edge_id, dst, translated,
+                         jobdata=None, location=None):
+        # TODO(fank): use POST for optimization
+        #             return rule_id for future reference
+        LOG.debug(_("VCNS: create dnat rule %(dst)s/%(translated)s"), {
+            'dst': dst, 'translated': translated})
+        dnat_rule = self._assemble_nat_rule(
+            "dnat", dst, translated)
+        userdata = {
+            'router_id': router_id,
+            'edge_id': edge_id,
+            'rule': dnat_rule,
+            'location': location,
+            'jobdata': jobdata
+        }
+        task_name = "create-dnat-%s-%s-%s" % (edge_id, dst, translated)
+        task = tasks.Task(task_name, router_id, self._create_nat_rule,
+                          userdata=userdata)
+        task.add_result_monitor(self.callbacks.dnat_create_result)
+        self.task_manager.add(task)
+        return task
+
+    def delete_dnat_rule(self, router_id, edge_id, translated,
+                         jobdata=None):
+        # TODO(fank): pass in rule_id for optimization
+        LOG.debug(_("VCNS: delete dnat rule %s"), translated)
+        userdata = {
+            'edge_id': edge_id,
+            'address': translated,
+            'addrtype': 'translatedAddress',
+            'jobdata': jobdata
+        }
+        task_name = "delete-dnat-%s-%s" % (edge_id, translated)
+        task = tasks.Task(task_name, router_id, self._delete_nat_rule,
+                          userdata=userdata)
+        task.add_result_monitor(self.callbacks.dnat_delete_result)
+        self.task_manager.add(task)
+        return task
+
+    def _update_nat_rule(self, task):
+        # TODO(fank): use POST for optimization
+        #             return rule_id for future reference
+        edge_id = task.userdata['edge_id']
+        if task != self.updated_task['nat'][edge_id]:
+            # this task does not have the latest config, abort now
+            # for speedup
+            return TaskStatus.ABORT
+
+        rules = task.userdata['rules']
+        LOG.debug(_("VCNS: start updating nat rules: %s"), rules)
+
+        nat = {
+            'rules': {
+                'natRulesDtos': rules
+            }
+        }
+
+        try:
+            self.vcns.update_nat_config(edge_id, nat)
+            status = TaskStatus.COMPLETED
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Failed to create snat rule:\n%s"),
+                          e.response)
+            status = TaskStatus.ERROR
+
+        return status
+
+    def update_nat_rules(self, router_id, edge_id, snats, dnats,
+                         jobdata=None):
+        LOG.debug(_("VCNS: update nat rule\n"
+                    "SNAT:%(snat)s\n"
+                    "DNAT:%(dnat)s\n"), {
+                        'snat': snats, 'dnat': dnats})
+        nat_rules = []
+
+        for dnat in dnats:
+            nat_rules.append(self._assemble_nat_rule(
+                'dnat', dnat['dst'], dnat['translated']))
+            nat_rules.append(self._assemble_nat_rule(
+                'snat', dnat['translated'], dnat['dst']))
+
+        for snat in snats:
+            nat_rules.append(self._assemble_nat_rule(
+                'snat', snat['src'], snat['translated']))
+
+        userdata = {
+            'edge_id': edge_id,
+            'rules': nat_rules,
+            'jobdata': jobdata,
+        }
+        task_name = "update-nat-%s" % edge_id
+        task = tasks.Task(task_name, router_id, self._update_nat_rule,
+                          userdata=userdata)
+        task.add_result_monitor(self.callbacks.nat_update_result)
+        self.updated_task['nat'][edge_id] = task
+        self.task_manager.add(task)
+        return task
+
+    def _update_routes(self, task):
+        edge_id = task.userdata['edge_id']
+        if (task != self.updated_task['route'][edge_id] and
+            task.userdata.get('skippable', True)):
+            # this task does not have the latest config, abort now
+            # for speedup
+            return TaskStatus.ABORT
+        gateway = task.userdata['gateway']
+        routes = task.userdata['routes']
+        LOG.debug(_("VCNS: start updating routes for %s"), edge_id)
+        static_routes = []
+        for route in routes:
+            static_routes.append({
+                "route": {
+                    "description": "",
+                    "vnic": vcns_const.INTERNAL_VNIC_INDEX,
+                    "network": route['cidr'],
+                    "nextHop": route['nexthop']
+                }
+            })
+        request = {
+            "staticRouting": {
+                "staticRoutes": static_routes,
+            }
+        }
+        if gateway:
+            request["staticRouting"]["defaultRoute"] = {
+                "description": "default-gateway",
+                "gatewayAddress": gateway,
+                "vnic": vcns_const.EXTERNAL_VNIC_INDEX
+            }
+        try:
+            self.vcns.update_routes(edge_id, request)
+            status = TaskStatus.COMPLETED
+        except exceptions.VcnsApiException as e:
+            LOG.exception(_("VCNS: Failed to update routes:\n%s"),
+                          e.response)
+            status = TaskStatus.ERROR
+
+        return status
+
+    def update_routes(self, router_id, edge_id, gateway, routes,
+                      skippable=True, jobdata=None):
+        if gateway:
+            gateway = gateway.split('/')[0]
+
+        userdata = {
+            'edge_id': edge_id,
+            'gateway': gateway,
+            'routes': routes,
+            'skippable': skippable,
+            'jobdata': jobdata
+        }
+        task_name = "update-routes-%s" % (edge_id)
+        task = tasks.Task(task_name, router_id, self._update_routes,
+                          userdata=userdata)
+        task.add_result_monitor(self.callbacks.routes_update_result)
+        self.updated_task['route'][edge_id] = task
+        self.task_manager.add(task)
+        return task
+
+    def create_lswitch(self, name, tz_config):
+        lsconfig = {
+            'display_name': name,
+            "tags": [],
+            "type": "LogicalSwitchConfig",
+            "_schema": "/ws.v1/schema/LogicalSwitchConfig",
+            "port_isolation_enabled": False,
+            "replication_mode": "service",
+            "transport_zones": tz_config
+        }
+
+        response = self.vcns.create_lswitch(lsconfig)[1]
+        return response
+
+    def delete_lswitch(self, lswitch_id):
+        self.vcns.delete_lswitch(lswitch_id)
diff --git a/neutron/plugins/nicira/vshield/tasks/__init__.py b/neutron/plugins/nicira/vshield/tasks/__init__.py
new file mode 100644 (file)
index 0000000..c020e3b
--- /dev/null
@@ -0,0 +1,16 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/plugins/nicira/vshield/tasks/constants.py b/neutron/plugins/nicira/vshield/tasks/constants.py
new file mode 100755 (executable)
index 0000000..c48debb
--- /dev/null
@@ -0,0 +1,46 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+class TaskStatus(object):
+    """Task running status.
+
+    This is used by execution/status callback function to notify the
+    task manager what's the status of current task, and also used for
+    indication the final task execution result.
+    """
+    PENDING = 1
+    COMPLETED = 2
+    ERROR = 3
+    ABORT = 4
+
+
+class TaskState(object):
+    """Current state of a task.
+
+    This is to keep track of the current state of a task.
+    NONE: the task is still in the queue
+    START: the task is pull out from the queue and is about to be executed
+    EXECUTED: the task has been executed
+    STATUS: we're running periodic status check for this task
+    RESULT: the task has finished and result is ready
+    """
+    NONE = -1
+    START = 0
+    EXECUTED = 1
+    STATUS = 2
+    RESULT = 3
diff --git a/neutron/plugins/nicira/vshield/tasks/tasks.py b/neutron/plugins/nicira/vshield/tasks/tasks.py
new file mode 100755 (executable)
index 0000000..5a76986
--- /dev/null
@@ -0,0 +1,385 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import uuid
+
+from eventlet import event
+from eventlet import greenthread
+from eventlet.support import greenlets as greenlet
+
+from neutron.common import exceptions
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
+from neutron.plugins.nicira.vshield.tasks.constants import TaskState
+from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
+
+DEFAULT_INTERVAL = 1000
+
+LOG = logging.getLogger(__name__)
+
+
+def nop(task):
+    return TaskStatus.COMPLETED
+
+
+class TaskException(exceptions.NeutronException):
+
+    def __init__(self, message=None, **kwargs):
+        if message is not None:
+            self.message = message
+
+        super(TaskException, self).__init__(**kwargs)
+
+
+class InvalidState(TaskException):
+    message = _("Invalid state %(state)d")
+
+
+class TaskStateSkipped(TaskException):
+    message = _("State %(state)d skipped. Current state %(current)d")
+
+
+class Task():
+    def __init__(self, name, resource_id, execute_callback,
+                 status_callback=nop, result_callback=nop, userdata=None):
+        self.name = name
+        self.resource_id = resource_id
+        self._execute_callback = execute_callback
+        self._status_callback = status_callback
+        self._result_callback = result_callback
+        self.userdata = userdata
+        self.id = None
+        self.status = None
+
+        self._monitors = {
+            TaskState.START: [],
+            TaskState.EXECUTED: [],
+            TaskState.RESULT: []
+        }
+        self._states = [None, None, None, None]
+        self._state = TaskState.NONE
+
+    def _add_monitor(self, action, func):
+        self._monitors[action].append(func)
+        return self
+
+    def _move_state(self, state):
+        self._state = state
+        if self._states[state] is not None:
+            e = self._states[state]
+            self._states[state] = None
+            e.send()
+
+        for s in range(state):
+            if self._states[s] is not None:
+                e = self._states[s]
+                self._states[s] = None
+                e.send_exception(
+                    TaskStateSkipped(state=s, current=self._state))
+
+    def _invoke_monitor(self, state):
+        for func in self._monitors[state]:
+            try:
+                func(self)
+            except Exception:
+                msg = _("Task %(task)s encountered exception in %(func)s "
+                        "at state %(state)s") % {
+                            'task': str(self),
+                            'func': str(func),
+                            'state': state}
+                LOG.exception(msg)
+
+        self._move_state(state)
+
+        return self
+
+    def _start(self):
+        return self._invoke_monitor(TaskState.START)
+
+    def _executed(self):
+        return self._invoke_monitor(TaskState.EXECUTED)
+
+    def _update_status(self, status):
+        if self.status == status:
+            return self
+
+        self.status = status
+
+    def _finished(self):
+        return self._invoke_monitor(TaskState.RESULT)
+
+    def add_start_monitor(self, func):
+        return self._add_monitor(TaskState.START, func)
+
+    def add_executed_monitor(self, func):
+        return self._add_monitor(TaskState.EXECUTED, func)
+
+    def add_result_monitor(self, func):
+        return self._add_monitor(TaskState.RESULT, func)
+
+    def wait(self, state):
+        if (state < TaskState.START or
+            state > TaskState.RESULT or
+            state == TaskState.STATUS):
+            raise InvalidState(state=state)
+
+        if state <= self._state:
+            # we already passed this current state, so no wait
+            return
+
+        e = event.Event()
+        self._states[state] = e
+        e.wait()
+
+    def __repr__(self):
+        return "Task-%s-%s-%s" % (
+            self.name, self.resource_id, self.id)
+
+
+class TaskManager():
+
+    _instance = None
+    _default_interval = DEFAULT_INTERVAL
+
+    def __init__(self, interval=None):
+        self._interval = interval or TaskManager._default_interval
+
+        # A queue to pass tasks from other threads
+        self._tasks_queue = collections.deque()
+
+        # A dict to store resource -> resource's tasks
+        self._tasks = {}
+
+        # New request event
+        self._req = event.Event()
+
+        # TaskHandler stopped event
+        self._stopped = event.Event()
+
+        # Periodic function trigger
+        self._monitor = None
+        self._monitor_busy = False
+        self._monitor_stop = None
+
+        # Thread handling the task request
+        self._thread = None
+
+    def _execute(self, task):
+        """Execute task."""
+        msg = _("Start task %s") % str(task)
+        LOG.debug(msg)
+        task._start()
+        try:
+            status = task._execute_callback(task)
+        except Exception:
+            msg = _("Task %(task)s encountered exception in %(cb)s") % {
+                'task': str(task),
+                'cb': str(task._execute_callback)}
+            LOG.exception(msg)
+            status = TaskStatus.ERROR
+
+        LOG.debug(_("Task %(task)s return %(status)s"), {
+            'task': str(task),
+            'status': status})
+
+        task._update_status(status)
+        task._executed()
+
+        return status
+
+    def _result(self, task):
+        """Notify task execution result."""
+        try:
+            task._result_callback(task)
+        except Exception:
+            msg = _("Task %(task)s encountered exception in %(cb)s") % {
+                'task': str(task),
+                'cb': str(task._result_callback)}
+            LOG.exception(msg)
+
+        LOG.debug(_("Task %(task)s return %(status)s") % {
+            'task': str(task),
+            'status': task.status})
+
+        task._finished()
+
+    def _check_pending_tasks(self):
+        """Check all pending tasks status."""
+        for resource_id in self._tasks.keys():
+            if self._monitor_stop:
+                # looping call is asked to stop, return now
+                return
+
+            tasks = self._tasks[resource_id]
+            # only the first task is executed and pending
+            task = tasks[0]
+            try:
+                status = task._status_callback(task)
+            except Exception:
+                msg = _("Task %(task)s encountered exception in %(cb)s") % {
+                    'task': str(task),
+                    'cb': str(task._status_callback)}
+                LOG.exception(msg)
+                status = TaskStatus.ERROR
+            task._update_status(status)
+            if status != TaskStatus.PENDING:
+                self._dequeue(task, True)
+
+    def _enqueue(self, task):
+        if task.resource_id in self._tasks:
+            # append to existing resource queue for ordered processing
+            self._tasks[task.resource_id].append(task)
+        else:
+            # put the task to a new resource queue
+            tasks = collections.deque()
+            tasks.append(task)
+            self._tasks[task.resource_id] = tasks
+
+    def _dequeue(self, task, run_next):
+        self._result(task)
+        tasks = self._tasks[task.resource_id]
+        tasks.remove(task)
+        if not tasks:
+            # no more tasks for this resource
+            del self._tasks[task.resource_id]
+            return
+
+        if run_next:
+            # process next task for this resource
+            while tasks:
+                task = tasks[0]
+                status = self._execute(task)
+                if status == TaskStatus.PENDING:
+                    break
+                self._dequeue(task, False)
+
+    def _abort(self):
+        """Abort all tasks."""
+        for resource_id in self._tasks.keys():
+            tasks = list(self._tasks[resource_id])
+            for task in tasks:
+                task._update_status(TaskStatus.ABORT)
+                self._dequeue(task, False)
+
+    def _get_task(self):
+        """Get task request."""
+        while True:
+            for t in self._tasks_queue:
+                return self._tasks_queue.popleft()
+            self._req.wait()
+            self._req.reset()
+
+    def run(self):
+        while True:
+            try:
+                # get a task from queue, or timeout for periodic status check
+                task = self._get_task()
+                if task.resource_id in self._tasks:
+                    # this resource already has some tasks under processing,
+                    # append the task to same queue for ordered processing
+                    self._enqueue(task)
+                    continue
+
+                status = self._execute(task)
+
+                if status != TaskStatus.PENDING:
+                    self._result(task)
+                    continue
+
+                self._enqueue(task)
+            except greenlet.GreenletExit:
+                break
+            except Exception:
+                LOG.exception(_("TaskManager terminated"))
+                break
+
+        self._monitor.stop()
+        if self._monitor_busy:
+            self._monitor_stop = event.Event()
+            self._monitor_stop.wait()
+            self._monitor_stop = None
+        self._abort()
+        self._stopped.send()
+
+    def add(self, task):
+        task.id = uuid.uuid1()
+        self._tasks_queue.append(task)
+        if not self._req.ready():
+            self._req.send()
+        return task.id
+
+    def stop(self):
+        if not self._thread:
+            return
+        self._thread.kill()
+        self._stopped.wait()
+        self._thread = None
+
+    def has_pending_task(self):
+        if self._tasks_queue:
+            return True
+
+        if self._tasks:
+            return True
+
+        return False
+
+    def show_pending_tasks(self):
+        for task in self._tasks_queue:
+            print str(task)
+        for resource, tasks in self._tasks.iteritems():
+            for task in tasks:
+                print str(task)
+
+    def count(self):
+        count = 0
+        for resource_id, tasks in self._tasks.iteritems():
+            count += len(tasks)
+        return count
+
+    def start(self, interval=None):
+        def _inner():
+            self.run()
+
+        def _loopingcall_callback():
+            try:
+                self._monitor_busy = True
+                self._check_pending_tasks()
+                self._monitor_busy = False
+                if self._monitor_stop:
+                    self._monitor_stop.send()
+            except Exception:
+                LOG.exception(_("Exception in _check_pending_tasks"))
+
+        if self._thread:
+            return self
+
+        if interval is None or interval == 0:
+            interval = self._interval
+
+        self._thread = greenthread.spawn(_inner)
+        self._monitor = loopingcall.FixedIntervalLoopingCall(
+            _loopingcall_callback)
+        self._monitor.start(interval / 1000.0,
+                            interval / 1000.0)
+
+        return self
+
+    @classmethod
+    def set_default_interval(cls, interval):
+        cls._default_interval = interval
diff --git a/neutron/plugins/nicira/vshield/vcns.py b/neutron/plugins/nicira/vshield/vcns.py
new file mode 100644 (file)
index 0000000..72e5e85
--- /dev/null
@@ -0,0 +1,111 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: linb, VMware
+
+from neutron.openstack.common import jsonutils
+from neutron.openstack.common import log as logging
+from neutron.plugins.nicira.vshield.common import VcnsApiClient
+
+LOG = logging.getLogger(__name__)
+
+HTTP_GET = "GET"
+HTTP_POST = "POST"
+HTTP_DELETE = "DELETE"
+HTTP_PUT = "PUT"
+URI_PREFIX = "/api/4.0/edges"
+
+
+class Vcns(object):
+
+    def __init__(self, address, user, password):
+        self.address = address
+        self.user = user
+        self.password = password
+        self.jsonapi_client = VcnsApiClient.VcnsApiHelper(address, user,
+                                                          password, 'json')
+        # TODO(fank): remove this after json syntax is fixed on VSM
+        self.xmlapi_client = VcnsApiClient.VcnsApiHelper(address, user,
+                                                         password, 'xml')
+
+    def do_request(self, method, uri, params=None, format='json', **kwargs):
+        LOG.debug(_("VcnsApiHelper('%(method)s', '%(uri)s', '%(body)s')"), {
+                  'method': method,
+                  'uri': uri,
+                  'body': jsonutils.dumps(params)})
+        if format == 'json':
+            header, content = self.jsonapi_client.request(method, uri, params)
+        else:
+            header, content = self.xmlapi_client.request(method, uri, params)
+        LOG.debug(_("Header: '%s'"), header)
+        LOG.debug(_("Content: '%s'"), content)
+        if content == '':
+            return header, {}
+        if kwargs.get('decode', True):
+            content = jsonutils.loads(content)
+        return header, content
+
+    def deploy_edge(self, request):
+        uri = URI_PREFIX + "?async=true"
+        return self.do_request(HTTP_POST, uri, request, decode=False)
+
+    def get_edge_id(self, job_id):
+        uri = URI_PREFIX + "/jobs/%s" % job_id
+        return self.do_request(HTTP_GET, uri, decode=True)
+
+    def get_edge_deploy_status(self, edge_id):
+        uri = URI_PREFIX + "/%s/status?getlatest=false" % edge_id
+        return self.do_request(HTTP_GET, uri, decode="True")
+
+    def delete_edge(self, edge_id):
+        uri = "%s/%s" % (URI_PREFIX, edge_id)
+        return self.do_request(HTTP_DELETE, uri)
+
+    def update_interface(self, edge_id, vnic):
+        uri = "%s/%s/vnics/%d" % (URI_PREFIX, edge_id, vnic['index'])
+        return self.do_request(HTTP_PUT, uri, vnic, decode=True)
+
+    def get_nat_config(self, edge_id):
+        uri = "%s/%s/nat/config" % (URI_PREFIX, edge_id)
+        return self.do_request(HTTP_GET, uri, decode=True)
+
+    def update_nat_config(self, edge_id, nat):
+        uri = "%s/%s/nat/config" % (URI_PREFIX, edge_id)
+        return self.do_request(HTTP_PUT, uri, nat, decode=True)
+
+    def delete_nat_rule(self, edge_id, rule_id):
+        uri = "%s/%s/nat/config/rules/%s" % (URI_PREFIX, edge_id, rule_id)
+        return self.do_request(HTTP_DELETE, uri, decode=True)
+
+    def get_edge_status(self, edge_id):
+        uri = "%s/%s/status?getlatest=false" % (URI_PREFIX, edge_id)
+        return self.do_request(HTTP_GET, uri, decode=True)
+
+    def get_edges(self):
+        uri = URI_PREFIX
+        return self.do_request(HTTP_GET, uri, decode=True)
+
+    def update_routes(self, edge_id, routes):
+        uri = "%s/%s/routing/config/static" % (URI_PREFIX, edge_id)
+        return self.do_request(HTTP_PUT, uri, routes, format='xml')
+
+    def create_lswitch(self, lsconfig):
+        uri = "/api/ws.v1/lswitch"
+        return self.do_request(HTTP_POST, uri, lsconfig, decode=True)
+
+    def delete_lswitch(self, lswitch_id):
+        uri = "/api/ws.v1/lswitch/%s" % lswitch_id
+        return self.do_request(HTTP_DELETE, uri)
diff --git a/neutron/plugins/nicira/vshield/vcns_driver.py b/neutron/plugins/nicira/vshield/vcns_driver.py
new file mode 100644 (file)
index 0000000..f5f6903
--- /dev/null
@@ -0,0 +1,44 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: linb, VMware
+
+from oslo.config import cfg
+
+from neutron.plugins.nicira.common import config  # noqa
+from neutron.plugins.nicira.vshield import edge_appliance_driver
+from neutron.plugins.nicira.vshield.tasks import tasks
+from neutron.plugins.nicira.vshield import vcns
+
+
+class VcnsDriver(edge_appliance_driver.EdgeApplianceDriver):
+    def __init__(self, callbacks):
+        super(VcnsDriver, self).__init__()
+
+        self.callbacks = callbacks
+        self.vcns_uri = cfg.CONF.vcns.manager_uri
+        self.vcns_user = cfg.CONF.vcns.user
+        self.vcns_passwd = cfg.CONF.vcns.password
+        self.datacenter_moid = cfg.CONF.vcns.datacenter_moid
+        self.deployment_container_id = cfg.CONF.vcns.deployment_container_id
+        self.resource_pool_id = cfg.CONF.vcns.resource_pool_id
+        self.datastore_id = cfg.CONF.vcns.datastore_id
+        self.external_network = cfg.CONF.vcns.external_network
+        interval = cfg.CONF.vcns.task_status_check_interval
+        self.task_manager = tasks.TaskManager(interval)
+        self.task_manager.start()
+
+        self.vcns = vcns.Vcns(self.vcns_uri, self.vcns_user, self.vcns_passwd)
index dd4cc8360d6710b5fc646035321051e66e47502b..c4a742471b96044f1081d24def2af1115eba78e5 100644 (file)
 import os
 
 import neutron.plugins.nicira.api_client.client_eventlet as client
+from neutron.plugins.nicira import extensions
 import neutron.plugins.nicira.NeutronPlugin as plugin
 import neutron.plugins.nicira.NvpApiClient as nvpapi
+from neutron.plugins.nicira.vshield import vcns
 
 nvp_plugin = plugin.NvpPluginV2
 api_helper = nvpapi.NVPApiHelper
 nvp_client = client.NvpApiClientEventlet
+vcns_class = vcns.Vcns
 
 STUBS_PATH = os.path.join(os.path.dirname(__file__), 'etc')
-NVPEXT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)),
-                           "../../plugins/nicira/extensions")
+NVPEXT_PATH = os.path.dirname(extensions.__file__)
 NVPAPI_NAME = '%s.%s' % (api_helper.__module__, api_helper.__name__)
 PLUGIN_NAME = '%s.%s' % (nvp_plugin.__module__, nvp_plugin.__name__)
 CLIENT_NAME = '%s.%s' % (nvp_client.__module__, nvp_client.__name__)
+VCNS_NAME = '%s.%s' % (vcns_class.__module__, vcns_class.__name__)
 
 
 def get_fake_conf(filename):
diff --git a/neutron/tests/unit/nicira/etc/vcns.ini.test b/neutron/tests/unit/nicira/etc/vcns.ini.test
new file mode 100644 (file)
index 0000000..38b3361
--- /dev/null
@@ -0,0 +1,9 @@
+[vcns]
+manager_uri = https://fake-host
+user = fake-user
+passwordd = fake-password
+datacenter_moid = fake-moid
+resource_pool_id = fake-resgroup
+datastore_id = fake-datastore
+external_network = fake-ext-net
+task_status_check_interval = 100
diff --git a/neutron/tests/unit/nicira/test_vcns_driver.py b/neutron/tests/unit/nicira/test_vcns_driver.py
new file mode 100644 (file)
index 0000000..b5867f4
--- /dev/null
@@ -0,0 +1,541 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from eventlet import greenthread
+import mock
+
+from neutron.common import config as n_config
+from neutron.plugins.nicira.vshield.common import (
+    constants as vcns_const)
+from neutron.plugins.nicira.vshield.common.constants import RouterStatus
+from neutron.plugins.nicira.vshield.tasks.constants import TaskState
+from neutron.plugins.nicira.vshield.tasks.constants import TaskStatus
+from neutron.plugins.nicira.vshield.tasks import tasks as ts
+from neutron.plugins.nicira.vshield import vcns_driver
+from neutron.tests import base
+from neutron.tests.unit.nicira import get_fake_conf
+from neutron.tests.unit.nicira import VCNS_NAME
+from neutron.tests.unit.nicira.vshield import fake_vcns
+
+VCNS_CONFIG_FILE = get_fake_conf("vcns.ini.test")
+
+ts.TaskManager.set_default_interval(100)
+
+
+class VcnsDriverTaskManagerTestCase(base.BaseTestCase):
+
+    def setUp(self):
+        super(VcnsDriverTaskManagerTestCase, self).setUp()
+        self.manager = ts.TaskManager()
+        self.manager.start(100)
+
+    def tearDown(self):
+        self.manager.stop()
+        super(VcnsDriverTaskManagerTestCase, self).tearDown()
+
+    def _test_task_manager_task_process_state(self, sync_exec=False):
+        def _task_failed(task, reason):
+            task.userdata['result'] = False
+            task.userdata['error'] = reason
+
+        def _check_state(task, exp_state):
+            if not task.userdata.get('result', True):
+                return False
+
+            state = task.userdata['state']
+            if state != exp_state:
+                msg = "state %d expect %d" % (
+                    state, exp_state)
+                _task_failed(task, msg)
+                return False
+
+            task.userdata['state'] = state + 1
+            return True
+
+        def _exec(task):
+            if not _check_state(task, 1):
+                return TaskStatus.ERROR
+
+            if task.userdata['sync_exec']:
+                return TaskStatus.COMPLETED
+            else:
+                return TaskStatus.PENDING
+
+        def _status(task):
+            if task.userdata['sync_exec']:
+                _task_failed(task, "_status callback triggered")
+
+            state = task.userdata['state']
+            if state == 3:
+                _check_state(task, 3)
+                return TaskStatus.PENDING
+            else:
+                _check_state(task, 4)
+                return TaskStatus.COMPLETED
+
+        def _result(task):
+            if task.userdata['sync_exec']:
+                exp_state = 3
+            else:
+                exp_state = 5
+
+            _check_state(task, exp_state)
+
+        def _start_monitor(task):
+            _check_state(task, 0)
+
+        def _executed_monitor(task):
+            _check_state(task, 2)
+
+        def _result_monitor(task):
+            if task.userdata['sync_exec']:
+                exp_state = 4
+            else:
+                exp_state = 6
+
+            if _check_state(task, exp_state):
+                task.userdata['result'] = True
+            else:
+                task.userdata['result'] = False
+
+        userdata = {
+            'state': 0,
+            'sync_exec': sync_exec
+        }
+        task = ts.Task('name', 'res', _exec, _status, _result, userdata)
+        task.add_start_monitor(_start_monitor)
+        task.add_executed_monitor(_executed_monitor)
+        task.add_result_monitor(_result_monitor)
+
+        self.manager.add(task)
+
+        task.wait(TaskState.RESULT)
+
+        if 'error' in userdata:
+            print userdata['error']
+
+        self.assertTrue(userdata['result'])
+
+    def test_task_manager_task_sync_exec_process_state(self):
+        self._test_task_manager_task_process_state(sync_exec=True)
+
+    def test_task_manager_task_async_exec_process_state(self):
+        self._test_task_manager_task_process_state(sync_exec=False)
+
+    def test_task_manager_task_ordered_process(self):
+        def _task_failed(task, reason):
+            task.userdata['result'] = False
+            task.userdata['error'] = reason
+
+        def _exec(task):
+            task.userdata['executed'] = True
+            return TaskStatus.PENDING
+
+        def _status(task):
+            return TaskStatus.COMPLETED
+
+        def _result(task):
+            next_task = task.userdata.get('next')
+            if next_task:
+                if next_task.userdata.get('executed'):
+                    _task_failed(next_task, "executed premature")
+            if task.userdata.get('result', True):
+                task.userdata['result'] = True
+
+        tasks = []
+        prev = None
+        last_task = None
+        for i in range(5):
+            name = "name-%d" % i
+            task = ts.Task(name, 'res', _exec, _status, _result, {})
+            tasks.append(task)
+            if prev:
+                prev.userdata['next'] = task
+            prev = task
+            last_task = task
+
+        for task in tasks:
+            self.manager.add(task)
+
+        last_task.wait(TaskState.RESULT)
+
+        for task in tasks:
+            if 'error' in task.userdata:
+                print "Task %s failed: " % (
+                    tasks.name, tasks.userdata['error'])
+
+        for task in tasks:
+            self.assertTrue(task.userdata['result'])
+
+    def test_task_manager_task_parallel_process(self):
+        tasks = []
+
+        def _exec(task):
+            task.userdata['executed'] = True
+            return TaskStatus.PENDING
+
+        def _status(task):
+            for t in tasks:
+                if not t.userdata.get('executed'):
+                    t.userdata['resut'] = False
+            return TaskStatus.COMPLETED
+
+        def _result(task):
+            if (task.userdata.get('result') is None and
+                task.status == TaskStatus.COMPLETED):
+                task.userdata['result'] = True
+            else:
+                task.userdata['result'] = False
+
+        for i in range(5):
+            name = "name-%d" % i
+            res = 'resource-%d' % i
+            task = ts.Task(name, res, _exec, _status, _result, {})
+            tasks.append(task)
+            self.manager.add(task)
+
+        for task in tasks:
+            task.wait(TaskState.RESULT)
+            self.assertTrue(task.userdata['result'])
+
+    def test_task_manager_stop(self):
+        def _exec(task):
+            return TaskStatus.PENDING
+
+        def _status(task):
+            greenthread.sleep(0.1)
+            return TaskStatus.PENDING
+
+        def _result(task):
+            pass
+
+        manager = ts.TaskManager().start(100)
+
+        alltasks = {}
+        for i in range(100):
+            res = 'res-%d' % i
+            tasks = []
+            for i in range(100):
+                task = ts.Task('name', res, _exec, _status, _result)
+                manager.add(task)
+                tasks.append(task)
+            alltasks[res] = tasks
+
+        greenthread.sleep(2)
+        manager.stop()
+        for res, tasks in alltasks.iteritems():
+            for task in tasks:
+                self.assertEqual(task.status, TaskStatus.ABORT)
+
+
+class VcnsDriverTestCase(base.BaseTestCase):
+
+    def vcns_patch(self):
+        instance = self.mock_vcns.start()
+        instance.return_value.deploy_edge.side_effect = self.fc.deploy_edge
+        instance.return_value.get_edge_id.side_effect = self.fc.get_edge_id
+        instance.return_value.get_edge_deploy_status.side_effect = (
+            self.fc.get_edge_deploy_status)
+        instance.return_value.delete_edge.side_effect = self.fc.delete_edge
+        instance.return_value.update_interface.side_effect = (
+            self.fc.update_interface)
+        instance.return_value.get_nat_config.side_effect = (
+            self.fc.get_nat_config)
+        instance.return_value.update_nat_config.side_effect = (
+            self.fc.update_nat_config)
+        instance.return_value.delete_nat_rule.side_effect = (
+            self.fc.delete_nat_rule)
+        instance.return_value.get_edge_status.side_effect = (
+            self.fc.get_edge_status)
+        instance.return_value.get_edges.side_effect = self.fc.get_edges
+        instance.return_value.update_routes.side_effect = (
+            self.fc.update_routes)
+        instance.return_value.create_lswitch.side_effect = (
+            self.fc.create_lswitch)
+        instance.return_value.delete_lswitch.side_effect = (
+            self.fc.delete_lswitch)
+
+    def setUp(self):
+        super(VcnsDriverTestCase, self).setUp()
+
+        n_config.parse(['--config-file', VCNS_CONFIG_FILE])
+
+        self.fc = fake_vcns.FakeVcns()
+        self.mock_vcns = mock.patch(VCNS_NAME, autospec=True)
+        self.vcns_patch()
+
+        self.addCleanup(self.fc.reset_all)
+        self.addCleanup(self.mock_vcns.stop)
+
+        self.vcns_driver = vcns_driver.VcnsDriver(self)
+
+        self.edge_id = None
+        self.result = None
+
+    def _deploy_edge(self):
+        task = self.vcns_driver.deploy_edge(
+            'router-id', 'myedge', 'internal-network', {}, wait_for_exec=True)
+        self.assertEqual(self.edge_id, 'edge-1')
+        task.wait(TaskState.RESULT)
+        return task
+
+    def edge_deploy_started(self, task):
+        self.edge_id = task.userdata['edge_id']
+
+    def edge_deploy_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['edge_deploy_result'] = True
+
+    def edge_delete_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['edge_delete_result'] = True
+
+    def snat_create_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['snat_create_result'] = True
+
+    def snat_delete_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['snat_delete_result'] = True
+
+    def dnat_create_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['dnat_create_result'] = True
+
+    def dnat_delete_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['dnat_delete_result'] = True
+
+    def nat_update_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['nat_update_result'] = True
+
+    def routes_update_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['routes_update_result'] = True
+
+    def interface_update_result(self, task):
+        if task.status == TaskStatus.COMPLETED:
+            task.userdata['jobdata']['interface_update_result'] = True
+
+    def test_deploy_edge(self):
+        jobdata = {}
+        task = self.vcns_driver.deploy_edge(
+            'router-id', 'myedge', 'internal-network', jobdata=jobdata,
+            wait_for_exec=True)
+        self.assertEqual(self.edge_id, 'edge-1')
+        task.wait(TaskState.RESULT)
+        self.assertEqual(task.status, TaskStatus.COMPLETED)
+        self.assertTrue(jobdata.get('edge_deploy_result'))
+
+    def test_deploy_edge_fail(self):
+        self.vcns_driver.deploy_edge(
+            'router-1', 'myedge', 'internal-network', {}, wait_for_exec=True)
+        task = self.vcns_driver.deploy_edge(
+            'router-2', 'myedge', 'internal-network', {}, wait_for_exec=True)
+        task.wait(TaskState.RESULT)
+        self.assertEqual(task.status, TaskStatus.ERROR)
+
+    def test_get_edge_status(self):
+        self._deploy_edge()
+        status = self.vcns_driver.get_edge_status(self.edge_id)
+        self.assertEqual(status, RouterStatus.ROUTER_STATUS_ACTIVE)
+
+    def test_get_edges(self):
+        self._deploy_edge()
+        edges = self.vcns_driver.get_edges_statuses()
+        found = False
+        for edge_id, status in edges.iteritems():
+            if edge_id == self.edge_id:
+                found = True
+                break
+        self.assertTrue(found)
+
+    def _create_nat_rule(self, edge_id, action, org, translated):
+        jobdata = {}
+        if action == 'snat':
+            task = self.vcns_driver.create_snat_rule(
+                'router-id', edge_id, org, translated, jobdata=jobdata)
+            key = 'snat_create_result'
+        else:
+            task = self.vcns_driver.create_dnat_rule(
+                'router-id', edge_id, org, translated, jobdata=jobdata)
+            key = 'dnat_create_result'
+        task.wait(TaskState.RESULT)
+        self.assertTrue(jobdata.get(key))
+
+    def _delete_nat_rule(self, edge_id, action, addr):
+        jobdata = {}
+        if action == 'snat':
+            task = self.vcns_driver.delete_snat_rule(
+                'router-id', edge_id, addr, jobdata=jobdata)
+            key = 'snat_delete_result'
+        else:
+            task = self.vcns_driver.delete_dnat_rule(
+                'router-id', edge_id, addr, jobdata=jobdata)
+            key = 'dnat_delete_result'
+        task.wait(TaskState.RESULT)
+        self.assertTrue(jobdata.get(key))
+
+    def _test_create_nat_rule(self, action):
+        self._deploy_edge()
+        addr = '192.168.1.1'
+        translated = '10.0.0.1'
+        self._create_nat_rule(self.edge_id, action, addr, translated)
+
+        natcfg = self.vcns_driver.get_nat_config(self.edge_id)
+        for rule in natcfg['rules']['natRulesDtos']:
+            if (rule['originalAddress'] == addr and
+                rule['translatedAddress'] == translated and
+                rule['action'] == action):
+                break
+        else:
+            self.assertTrue(False)
+
+    def _test_delete_nat_rule(self, action):
+        self._deploy_edge()
+        addr = '192.168.1.1'
+        translated = '10.0.0.1'
+        self._create_nat_rule(self.edge_id, action, addr, translated)
+        if action == 'snat':
+            self._delete_nat_rule(self.edge_id, action, addr)
+        else:
+            self._delete_nat_rule(self.edge_id, action, translated)
+        natcfg = self.vcns_driver.get_nat_config(self.edge_id)
+        for rule in natcfg['rules']['natRulesDtos']:
+            if (rule['originalAddress'] == addr and
+                rule['translatedAddress'] == translated and
+                rule['action'] == action):
+                self.assertTrue(False)
+                break
+
+    def test_create_snat_rule(self):
+        self._test_create_nat_rule('snat')
+
+    def test_delete_snat_rule(self):
+        self._test_delete_nat_rule('snat')
+
+    def test_create_dnat_rule(self):
+        self._test_create_nat_rule('dnat')
+
+    def test_delete_dnat_rule(self):
+        self._test_delete_nat_rule('dnat')
+
+    def test_update_nat_rules(self):
+        self._deploy_edge()
+        jobdata = {}
+        snats = [{
+            'src': '192.168.1.0/24',
+            'translated': '10.0.0.1'
+        }, {
+            'src': '192.168.2.0/24',
+            'translated': '10.0.0.2'
+        }, {
+            'src': '192.168.3.0/24',
+            'translated': '10.0.0.3'
+        }
+        ]
+        dnats = [{
+            'dst': '100.0.0.4',
+            'translated': '192.168.1.1'
+        }, {
+            'dst': '100.0.0.5',
+            'translated': '192.168.2.1'
+        }
+        ]
+        task = self.vcns_driver.update_nat_rules(
+            'router-id', self.edge_id, snats, dnats, jobdata=jobdata)
+        task.wait(TaskState.RESULT)
+        self.assertTrue(jobdata.get('nat_update_result'))
+
+        natcfg = self.vcns_driver.get_nat_config(self.edge_id)
+        rules = natcfg['rules']['natRulesDtos']
+        self.assertEqual(len(rules), 2 * len(dnats) + len(snats))
+        self.natEquals(rules[0], dnats[0])
+        self.natEquals(rules[1], self.snat_for_dnat(dnats[0]))
+        self.natEquals(rules[2], dnats[1])
+        self.natEquals(rules[3], self.snat_for_dnat(dnats[1]))
+        self.natEquals(rules[4], snats[0])
+        self.natEquals(rules[5], snats[1])
+        self.natEquals(rules[6], snats[2])
+
+    def snat_for_dnat(self, dnat):
+        return {
+            'src': dnat['translated'],
+            'translated': dnat['dst']
+        }
+
+    def natEquals(self, rule, exp):
+        addr = exp.get('src')
+        if not addr:
+            addr = exp.get('dst')
+
+        self.assertEqual(rule['originalAddress'], addr)
+        self.assertEqual(rule['translatedAddress'], exp['translated'])
+
+    def test_update_routes(self):
+        self._deploy_edge()
+        jobdata = {}
+        routes = [{
+            'cidr': '192.168.1.0/24',
+            'nexthop': '169.254.2.1'
+        }, {
+            'cidr': '192.168.2.0/24',
+            'nexthop': '169.254.2.1'
+        }, {
+            'cidr': '192.168.3.0/24',
+            'nexthop': '169.254.2.1'
+        }
+        ]
+        task = self.vcns_driver.update_routes(
+            'router-id', self.edge_id, '10.0.0.1', routes, jobdata=jobdata)
+        task.wait(TaskState.RESULT)
+        self.assertTrue(jobdata.get('routes_update_result'))
+
+    def test_update_interface(self):
+        self._deploy_edge()
+        jobdata = {}
+        task = self.vcns_driver.update_interface(
+            'router-id', self.edge_id, vcns_const.EXTERNAL_VNIC_INDEX,
+            'network-id', address='100.0.0.3', netmask='255.255.255.0',
+            jobdata=jobdata)
+        task.wait(TaskState.RESULT)
+        self.assertTrue(jobdata.get('interface_update_result'))
+
+    def test_delete_edge(self):
+        self._deploy_edge()
+        jobdata = {}
+        task = self.vcns_driver.delete_edge(
+            'router-id', self.edge_id, jobdata=jobdata)
+        task.wait(TaskState.RESULT)
+        self.assertTrue(jobdata.get('edge_delete_result'))
+
+    def test_create_lswitch(self):
+        tz_config = [{
+            'transport_zone_uuid': 'tz-uuid'
+        }]
+        lswitch = self.vcns_driver.create_lswitch('lswitch', tz_config)
+        self.assertEqual(lswitch['display_name'], 'lswitch')
+        self.assertEqual(lswitch['type'], 'LogicalSwitchConfig')
+        self.assertIn('uuid', lswitch)
+
+    def test_delete_lswitch(self):
+        tz_config = {
+            'transport_zone_uuid': 'tz-uuid'
+        }
+        lswitch = self.vcns_driver.create_lswitch('lswitch', tz_config)
+        self.vcns_driver.delete_lswitch(lswitch['uuid'])
diff --git a/neutron/tests/unit/nicira/vshield/__init__.py b/neutron/tests/unit/nicira/vshield/__init__.py
new file mode 100644 (file)
index 0000000..5e8da71
--- /dev/null
@@ -0,0 +1,16 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/tests/unit/nicira/vshield/common/__init__.py b/neutron/tests/unit/nicira/vshield/common/__init__.py
new file mode 100644 (file)
index 0000000..5e8da71
--- /dev/null
@@ -0,0 +1,16 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
diff --git a/neutron/tests/unit/nicira/vshield/fake_vcns.py b/neutron/tests/unit/nicira/vshield/fake_vcns.py
new file mode 100644 (file)
index 0000000..cfc9137
--- /dev/null
@@ -0,0 +1,249 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 VMware, Inc
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: linb, VMware
+
+import copy
+import json
+
+from neutron.openstack.common import uuidutils
+
+
+class FakeVcns(object):
+
+    def __init__(self, unique_router_name=True):
+        self._jobs = {}
+        self._job_idx = 0
+        self._edges = {}
+        self._edge_idx = 0
+        self._lswitches = {}
+        self._unique_router_name = unique_router_name
+        self._fake_nvpapi = None
+
+    def set_fake_nvpapi(self, fake_nvpapi):
+        self._fake_nvpapi = fake_nvpapi
+
+    def _validate_edge_name(self, name):
+        for edge_id, edge in self._edges.iteritems():
+            if edge['name'] == name:
+                return False
+        return True
+
+    def deploy_edge(self, request):
+        if (self._unique_router_name and
+            not self._validate_edge_name(request['name'])):
+            header = {
+                'status': 400
+            }
+            msg = ('Edge name should be unique for tenant. Edge %s '
+                   'already exists for default tenant.') % request['name']
+            response = {
+                'details': msg,
+                'errorCode': 10085,
+                'rootCauseString': None,
+                'moduleName': 'vShield Edge',
+                'errorData': None
+            }
+            return (header, json.dumps(response))
+
+        self._job_idx = self._job_idx + 1
+        job_id = "jobdata-%d" % self._job_idx
+        self._edge_idx = self._edge_idx + 1
+        edge_id = "edge-%d" % self._edge_idx
+        self._jobs[job_id] = edge_id
+        self._edges[edge_id] = {
+            'name': request['name'],
+            'request': request,
+            'nat_rules': None,
+            'nat_rule_id': 0
+        }
+        header = {
+            'status': 200,
+            'location': 'https://host/api/4.0/jobs/%s' % job_id
+        }
+        response = ''
+        return (header, response)
+
+    def get_edge_id(self, job_id):
+        if job_id not in self._jobs:
+            raise Exception(_("Job %s does not nexist") % job_id)
+
+        header = {
+            'status': 200
+        }
+        response = {
+            'edgeId': self._jobs[job_id]
+        }
+        return (header, response)
+
+    def get_edge_deploy_status(self, edge_id):
+        if edge_id not in self._edges:
+            raise Exception(_("Edge %s does not exist") % edge_id)
+        header = {
+            'status': 200,
+        }
+        response = {
+            'systemStatus': 'good'
+        }
+        return (header, response)
+
+    def delete_edge(self, edge_id):
+        if edge_id not in self._edges:
+            raise Exception(_("Edge %s does not exist") % edge_id)
+        del self._edges[edge_id]
+        header = {
+            'status': 200
+        }
+        response = ''
+        return (header, response)
+
+    def update_interface(self, edge_id, vnic):
+        header = {
+            'status': 200
+        }
+        response = ''
+        return (header, response)
+
+    def get_nat_config(self, edge_id):
+        if edge_id not in self._edges:
+            raise Exception(_("Edge %s does not exist") % edge_id)
+        edge = self._edges[edge_id]
+        rules = edge['nat_rules']
+        if rules is None:
+            rules = {
+                'rules': {
+                    'natRulesDtos': []
+                },
+                'version': 1
+            }
+        header = {
+            'status': 200
+        }
+        rules['version'] = 1
+        return (header, rules)
+
+    def update_nat_config(self, edge_id, nat):
+        if edge_id not in self._edges:
+            raise Exception(_("Edge %s does not exist") % edge_id)
+        edge = self._edges[edge_id]
+        max_rule_id = edge['nat_rule_id']
+        rules = copy.deepcopy(nat)
+        for rule in rules['rules']['natRulesDtos']:
+            rule_id = rule.get('ruleId', 0)
+            if rule_id > max_rule_id:
+                max_rule_id = rule_id
+        for rule in rules['rules']['natRulesDtos']:
+            if 'ruleId' not in rule:
+                max_rule_id = max_rule_id + 1
+                rule['ruleId'] = max_rule_id
+        edge['nat_rules'] = rules
+        edge['nat_rule_id'] = max_rule_id
+        header = {
+            'status': 200
+        }
+        response = ''
+        return (header, response)
+
+    def delete_nat_rule(self, edge_id, rule_id):
+        if edge_id not in self._edges:
+            raise Exception(_("Edge %s does not exist") % edge_id)
+
+        edge = self._edges[edge_id]
+        rules = edge['nat_rules']
+        rule_to_delete = None
+        for rule in rules['rules']['natRulesDtos']:
+            if rule_id == rule['ruleId']:
+                rule_to_delete = rule
+                break
+        if rule_to_delete is None:
+            raise Exception(_("Rule id %d doest not exist") % rule_id)
+
+        rules['rules']['natRulesDtos'].remove(rule_to_delete)
+
+        header = {
+            'status': 200
+        }
+        response = ''
+        return (header, response)
+
+    def get_edge_status(self, edge_id):
+        if edge_id not in self._edges:
+            raise Exception(_("Edge %s does not exist") % edge_id)
+
+        header = {
+            'status': 200
+        }
+        response = {
+            'edgeStatus': 'GREEN'
+        }
+        return (header, response)
+
+    def get_edges(self):
+        header = {
+            'status': 200
+        }
+        edges = []
+        for edge_id in self._edges:
+            edges.append({
+                'id': edge_id,
+                'edgeStatus': 'GREEN'
+            })
+        response = {
+            'edgePage': {
+                'data': edges
+            }
+        }
+        return (header, response)
+
+    def update_routes(self, edge_id, routes):
+        header = {
+            'status': 200
+        }
+        response = ''
+        return (header, response)
+
+    def create_lswitch(self, lsconfig):
+        # The lswitch is created via VCNS API so the fake nvpapi wont
+        # see it. Added to fake nvpapi here.
+        if self._fake_nvpapi:
+            lswitch = self._fake_nvpapi._add_lswitch(json.dumps(lsconfig))
+        else:
+            lswitch = lsconfig
+            lswitch['uuid'] = uuidutils.generate_uuid()
+        self._lswitches[lswitch['uuid']] = lswitch
+        header = {
+            'status': 200
+        }
+        lswitch['_href'] = '/api/ws.v1/lswitch/%s' % lswitch['uuid']
+        return (header, lswitch)
+
+    def delete_lswitch(self, id):
+        if id not in self._lswitches:
+            raise Exception(_("Lswitch %s does not exist") % id)
+        del self._lswitches[id]
+        if self._fake_nvpapi:
+            # TODO(fank): fix the hack
+            del self._fake_nvpapi._fake_lswitch_dict[id]
+        header = {
+            'status': 200
+        }
+        response = ''
+        return (header, response)
+
+    def reset_all(self):
+        self._jobs.clear()
+        self._edges.clear()
+        self._lswitches.clear()