]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add nvp qos extension
authorAaron Rosen <arosen@nicira.com>
Wed, 13 Feb 2013 22:49:29 +0000 (14:49 -0800)
committerAaron Rosen <arosen@nicira.com>
Sun, 17 Feb 2013 05:17:12 +0000 (21:17 -0800)
Implements blueprint nvp-qos-extension

Change-Id: I8ad980128407c6ddb57e5f928663e0df15cc0065

etc/policy.json
quantum/plugins/nicira/nicira_nvp_plugin/QuantumPlugin.py
quantum/plugins/nicira/nicira_nvp_plugin/extensions/__init__.py [new file with mode: 0644]
quantum/plugins/nicira/nicira_nvp_plugin/extensions/nvp_qos.py [new file with mode: 0644]
quantum/plugins/nicira/nicira_nvp_plugin/nicira_qos_db.py [new file with mode: 0644]
quantum/plugins/nicira/nicira_nvp_plugin/nvplib.py
quantum/tests/unit/nicira/etc/fake_post_lqueue.json [new file with mode: 0644]
quantum/tests/unit/nicira/fake_nvpapiclient.py
quantum/tests/unit/nicira/test_nicira_plugin.py

index fa7efe81219b66d7aad9c280e2c656528a435dbb..cdaad0d17ade2b99341ced790e0f8b1ca4fddeaf 100644 (file)
@@ -49,5 +49,9 @@
     "create_service_type": "rule:admin_only",
     "update_service_type": "rule:admin_only",
     "delete_service_type": "rule:admin_only",
-    "get_service_type": "rule:regular_user"
+    "get_service_type": "rule:regular_user",
+
+    "create_qos_queue:": "rule:admin_only",
+    "get_qos_queue:": "rule:admin_only",
+    "get_qos_queues:": "rule:admin_only"
 }
index 5e2db7c649ef0ec65bd614a384f3bdecf8cb8907..83956d76d63fe404f663a286f04b27fa903ff78e 100644 (file)
@@ -53,13 +53,15 @@ from quantum import policy
 from quantum.plugins.nicira.nicira_nvp_plugin.common import config
 from quantum.plugins.nicira.nicira_nvp_plugin.common import (exceptions
                                                              as nvp_exc)
+from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
+                                                                 as ext_qos)
 from quantum.plugins.nicira.nicira_nvp_plugin import nicira_db
 from quantum.plugins.nicira.nicira_nvp_plugin import NvpApiClient
 from quantum.plugins.nicira.nicira_nvp_plugin import nvplib
 from quantum.plugins.nicira.nicira_nvp_plugin import nvp_cluster
 from quantum.plugins.nicira.nicira_nvp_plugin.nvp_plugin_version import (
     PLUGIN_VERSION)
-
+from quantum.plugins.nicira.nicira_nvp_plugin import nicira_qos_db as qos_db
 LOG = logging.getLogger("QuantumPlugin")
 NVP_FLOATINGIP_NAT_RULES_ORDER = 200
 NVP_EXTGW_NAT_RULES_ORDER = 255
@@ -123,14 +125,14 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                   l3_db.L3_NAT_db_mixin,
                   portsecurity_db.PortSecurityDbMixin,
                   securitygroups_db.SecurityGroupDbMixin,
-                  nvp_sec.NVPSecurityGroups):
+                  nvp_sec.NVPSecurityGroups, qos_db.NVPQoSDbMixin):
     """
     NvpPluginV2 is a Quantum plugin that provides L2 Virtual Network
     functionality using NVP.
     """
 
     supported_extension_aliases = ["provider", "quotas", "port-security",
-                                   "router", "security-group"]
+                                   "router", "security-group", "nvp-qos"]
     __native_bulk_support = True
 
     # Default controller cluster
@@ -355,7 +357,8 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                                         port_data['mac_address'],
                                         port_data['fixed_ips'],
                                         port_data[psec.PORTSECURITY],
-                                        port_data[ext_sg.SECURITYGROUPS])
+                                        port_data[ext_sg.SECURITYGROUPS],
+                                        port_data[ext_qos.QUEUE])
             nicira_db.add_quantum_nvp_port_mapping(
                 context.session, port_data['id'], lport['uuid'])
             d_owner = port_data['device_owner']
@@ -740,9 +743,18 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                                                               network)
             # Ensure there's an id in net_data
             net_data['id'] = new_net['id']
+            # Process port security extension
             self._process_network_create_port_security(context, net_data)
             # DB Operations for setting the network as external
             self._process_l3_create(context, net_data, new_net['id'])
+            # Process QoS queue extension
+            if network['network'].get(ext_qos.QUEUE):
+                new_net[ext_qos.QUEUE] = network['network'][ext_qos.QUEUE]
+                # Raises if not found
+                self.get_qos_queue(context, new_net[ext_qos.QUEUE])
+                self._process_network_queue_mapping(context, new_net)
+                self._extend_network_qos_queue(context, new_net)
+
             if net_data.get(pnet.NETWORK_TYPE):
                 net_binding = nicira_db.add_network_binding(
                     context.session, new_net['id'],
@@ -827,28 +839,6 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
         return pairs
 
     def get_network(self, context, id, fields=None):
-        """
-        Retrieves all attributes of the network, NOT including
-        the ports of that network.
-
-        :returns: a sequence of mappings with the following signature:
-                    {'id': UUID representing the network.
-                     'name': Human-readable name identifying the network.
-                     'tenant_id': Owner of network. only admin user
-                                  can specify a tenant_id other than its own.
-                     'admin_state_up': Sets admin state of network. if down,
-                                       network does not forward packets.
-                     'status': Indicates whether network is currently
-                               operational (limit values to "ACTIVE", "DOWN",
-                               "BUILD", and "ERROR"?
-                     'subnets': Subnets associated with this network. Plan
-                                to allow fully specified subnets as part of
-                                network create.
-                   }
-
-        :raises: exception.NetworkNotFound
-        :raises: exception.QuantumException
-        """
         with context.session.begin(subtransactions=True):
             # goto to the plugin DB and fetch the network
             network = self._get_network(context, id)
@@ -892,6 +882,7 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
             self._extend_network_dict_provider(context, net_result)
             self._extend_network_port_security_dict(context, net_result)
             self._extend_network_dict_l3(context, net_result)
+            self._extend_network_qos_queue(context, net_result)
         return self._fields(net_result, fields)
 
     def get_networks(self, context, filters=None, fields=None):
@@ -904,6 +895,8 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                 self._extend_network_dict_provider(context, net)
                 self._extend_network_port_security_dict(context, net)
                 self._extend_network_dict_l3(context, net)
+                self._extend_network_qos_queue(context, net)
+
             quantum_lswitches = self._filter_nets_l3(context,
                                                      quantum_lswitches,
                                                      filters)
@@ -981,10 +974,15 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
             if psec.PORTSECURITY in network['network']:
                 self._update_network_security_binding(
                     context, id, network['network'][psec.PORTSECURITY])
+            if network['network'].get(ext_qos.QUEUE):
+                net[ext_qos.QUEUE] = network['network'][ext_qos.QUEUE]
+                self._delete_network_queue_mapping(context, id)
+                self._process_network_queue_mapping(context, net)
             self._extend_network_port_security_dict(context, net)
             self._process_l3_update(context, network['network'], id)
             self._extend_network_dict_provider(context, net)
             self._extend_network_dict_l3(context, net)
+            self._extend_network_qos_queue(context, net)
         return net
 
     def get_ports(self, context, filters=None, fields=None):
@@ -1128,6 +1126,10 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                 self._get_security_groups_on_port(context, port))
             self._process_port_create_security_group(
                 context, quantum_db['id'], port_data[ext_sg.SECURITYGROUPS])
+            # QoS extension checks
+            port_data[ext_qos.QUEUE] = self._check_for_queue_and_create(
+                context, port_data)
+            self._process_port_queue_mapping(context, port_data)
             # provider networking extension checks
             # Fetch the network and network binding from Quantum db
             try:
@@ -1148,8 +1150,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
             LOG.debug(_("create_port completed on NVP for tenant "
                         "%(tenant_id)s: (%(id)s)"), port_data)
 
+            # remove since it will be added in extend based on policy
+            del port_data[ext_qos.QUEUE]
             self._extend_port_port_security_dict(context, port_data)
             self._extend_port_dict_security_group(context, port_data)
+            self._extend_port_qos_queue(context, port_data)
         return port_data
 
     def update_port(self, context, id, port):
@@ -1207,6 +1212,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
             if psec.PORTSECURITY in port['port']:
                 self._update_port_security_binding(
                     context, id, ret_port[psec.PORTSECURITY])
+
+            ret_port[ext_qos.QUEUE] = self._check_for_queue_and_create(
+                context, ret_port)
+            self._delete_port_queue_mapping(context, ret_port['id'])
+            self._process_port_queue_mapping(context, ret_port)
             self._extend_port_port_security_dict(context, ret_port)
             self._extend_port_dict_security_group(context, ret_port)
             LOG.debug(_("Update port request: %s"), port)
@@ -1219,8 +1229,12 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                                ret_port['mac_address'],
                                ret_port['fixed_ips'],
                                ret_port[psec.PORTSECURITY],
-                               ret_port[ext_sg.SECURITYGROUPS])
+                               ret_port[ext_sg.SECURITYGROUPS],
+                               ret_port[ext_qos.QUEUE])
 
+            # remove since it will be added in extend based on policy
+            del ret_port[ext_qos.QUEUE]
+            self._extend_port_qos_queue(context, ret_port)
         # Update the port status from nvp. If we fail here hide it since
         # the port was successfully updated but we were not able to retrieve
         # the status.
@@ -1244,11 +1258,15 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
         port_delete_func(context, quantum_db_port)
         self.disassociate_floatingips(context, id)
         with context.session.begin(subtransactions=True):
+            queue = self._get_port_queue_bindings(context, {'port_id': [id]})
             if (cfg.CONF.metadata_dhcp_host_route and
                 quantum_db_port.device_owner == constants.DEVICE_OWNER_DHCP):
                     self._ensure_metadata_host_route(
                         context, quantum_db_port.fixed_ips[0], is_delete=True)
             super(NvpPluginV2, self).delete_port(context, id)
+            # Delete qos queue if possible
+            if queue:
+                self.delete_qos_queue(context, queue[0]['queue_id'], False)
 
     def get_port(self, context, id, fields=None):
         with context.session.begin(subtransactions=True):
@@ -1256,6 +1274,7 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                                                                 id, fields)
             self._extend_port_port_security_dict(context, quantum_db_port)
             self._extend_port_dict_security_group(context, quantum_db_port)
+            self._extend_port_qos_queue(context, quantum_db_port)
 
             if self._network_is_external(context,
                                          quantum_db_port['network_id']):
@@ -1869,3 +1888,40 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
                 self.default_cluster, sgid, current_rules)
             return super(NvpPluginV2, self).delete_security_group_rule(context,
                                                                        sgrid)
+
+    def create_qos_queue(self, context, qos_queue, check_policy=True):
+        if check_policy:
+            self._enforce_set_auth(context, qos_queue,
+                                   ext_qos.qos_queue_create)
+        q = qos_queue.get('qos_queue')
+        self._validate_qos_queue(context, q)
+        q['id'] = nvplib.create_lqueue(self.default_cluster,
+                                       self._nvp_lqueue(q))
+        return super(NvpPluginV2, self).create_qos_queue(context, qos_queue)
+
+    def delete_qos_queue(self, context, id, raise_in_use=True):
+        filters = {'queue_id': [id]}
+        queues = self._get_port_queue_bindings(context, filters)
+        if queues:
+            if raise_in_use:
+                raise ext_qos.QueueInUseByPort()
+            else:
+                return
+        nvplib.delete_lqueue(self.default_cluster, id)
+        return super(NvpPluginV2, self).delete_qos_queue(context, id)
+
+    def get_qos_queue(self, context, id, fields=None):
+        if not self._check_view_auth(context, {'qos_queue': None},
+                                     ext_qos.qos_queue_get):
+            # don't want the user to find out that they guessed the right id
+            # so  we raise not found if the policy.json file doesn't allow them
+            raise ext_qos.QueueNotFound(id=id)
+
+        return super(NvpPluginV2, self).get_qos_queue(context, id, fields)
+
+    def get_qos_queues(self, context, filters=None, fields=None):
+        if not self._check_view_auth(context, {'qos_queue': []},
+                                     ext_qos.qos_queue_list):
+            return []
+        return super(NvpPluginV2, self).get_qos_queues(context, filters,
+                                                       fields)
diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/extensions/__init__.py b/quantum/plugins/nicira/nicira_nvp_plugin/extensions/__init__.py
new file mode 100644 (file)
index 0000000..5f67d78
--- /dev/null
@@ -0,0 +1,18 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/extensions/nvp_qos.py b/quantum/plugins/nicira/nicira_nvp_plugin/extensions/nvp_qos.py
new file mode 100644 (file)
index 0000000..9ecc7f8
--- /dev/null
@@ -0,0 +1,202 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
+
+from abc import abstractmethod
+
+from quantum.api.v2 import attributes as attr
+from quantum.api.v2 import base
+from quantum.api import extensions
+from quantum.common import exceptions as qexception
+from quantum import manager
+
+
+# For policy.json/Auth
+qos_queue_create = "create_qos_queue"
+qos_queue_delete = "delete_qos_queue"
+qos_queue_get = "get_qos_queue"
+qos_queue_list = "get_qos_queues"
+
+
+class DefaultQueueCreateNotAdmin(qexception.InUse):
+    message = _("Need to be admin in order to create queue called default")
+
+
+class DefaultQueueAlreadyExists(qexception.InUse):
+    message = _("Default queue already exists.")
+
+
+class QueueInvalidDscp(qexception.InvalidInput):
+    message = _("Invalid value for dscp %(data)s must be integer.")
+
+
+class QueueMinGreaterMax(qexception.InvalidInput):
+    message = _("Invalid bandwidth rate, min greater than max.")
+
+
+class QueueInvalidBandwidth(qexception.InvalidInput):
+    message = _("Invalid bandwidth rate, %(data)s must be a non negative"
+                " integer.")
+
+
+class MissingDSCPForTrusted(qexception.InvalidInput):
+    message = _("No DSCP field needed when QoS workload marked trusted")
+
+
+class QueueNotFound(qexception.NotFound):
+    message = _("Queue %(id)s does not exist")
+
+
+class QueueInUseByPort(qexception.InUse):
+    message = _("Unable to delete queue attached to port.")
+
+
+class QueuePortBindingNotFound(qexception.NotFound):
+    message = _("Port is not associated with lqueue")
+
+
+def convert_to_unsigned_int_or_none(val):
+    if val is None:
+        return
+    try:
+        val = int(val)
+        if val < 0:
+            raise ValueError
+    except (ValueError, TypeError):
+        msg = _("'%s' must be a non negative integer.") % val
+        raise qexception.InvalidInput(error_message=msg)
+    return val
+
+# Attribute Map
+RESOURCE_ATTRIBUTE_MAP = {
+    'qos_queues': {
+        'id': {'allow_post': False, 'allow_put': False,
+               'is_visible': True},
+        'default': {'allow_post': True, 'allow_put': False,
+                    'convert_to': attr.convert_to_boolean,
+                    'is_visible': True, 'default': False},
+        'name': {'allow_post': True, 'allow_put': False,
+                 'validate': {'type:string': None},
+                 'is_visible': True, 'default': ''},
+        'min': {'allow_post': True, 'allow_put': False,
+                'is_visible': True, 'default': '0',
+                'convert_to': convert_to_unsigned_int_or_none},
+        'max': {'allow_post': True, 'allow_put': False,
+                'is_visible': True, 'default': None,
+                'convert_to': convert_to_unsigned_int_or_none},
+        'qos_marking': {'allow_post': True, 'allow_put': False,
+                        'validate': {'type:values': ['untrusted', 'trusted']},
+                        'default': 'untrusted', 'is_visible': True},
+        'dscp': {'allow_post': True, 'allow_put': False,
+                 'is_visible': True, 'default': '0',
+                 'convert_to': convert_to_unsigned_int_or_none},
+        'tenant_id': {'allow_post': True, 'allow_put': False,
+                      'required_by_policy': True,
+                      'validate': {'type:string': None},
+                      'is_visible': True},
+    },
+}
+
+
+QUEUE = 'queue_id'
+RXTX_FACTOR = 'rxtx_factor'
+EXTENDED_ATTRIBUTES_2_0 = {
+    'ports': {
+        RXTX_FACTOR: {'allow_post': True,
+                      'allow_put': False,
+                      'is_visible': False,
+                      'default': 1,
+                      'convert_to': convert_to_unsigned_int_or_none},
+
+        QUEUE: {'allow_post': False,
+                'allow_put': False,
+                'is_visible': True,
+                'default': False}},
+    'networks': {QUEUE: {'allow_post': True,
+                         'allow_put': True,
+                         'is_visible': True,
+                         'default': False}}
+
+}
+
+
+class Nvp_qos(object):
+    """Port Queue extension"""
+
+    @classmethod
+    def get_name(cls):
+        return "nvp-qos"
+
+    @classmethod
+    def get_alias(cls):
+        return "nvp-qos"
+
+    @classmethod
+    def get_description(cls):
+        return "NVP QoS extension."
+
+    @classmethod
+    def get_namespace(cls):
+        return "http://docs.openstack.org/ext/nvp-qos/api/v2.0"
+
+    @classmethod
+    def get_updated(cls):
+        return "2012-10-05T10:00:00-00:00"
+
+    @classmethod
+    def get_resources(cls):
+        """ Returns Ext Resources """
+        exts = []
+        plugin = manager.QuantumManager.get_plugin()
+        resource_name = 'qos_queue'
+        collection_name = resource_name.replace('_', '-') + "s"
+        params = RESOURCE_ATTRIBUTE_MAP.get(resource_name + "s", dict())
+        controller = base.create_resource(collection_name,
+                                          resource_name,
+                                          plugin, params, allow_bulk=False)
+
+        ex = extensions.ResourceExtension(collection_name,
+                                          controller)
+        exts.append(ex)
+
+        return exts
+
+    def get_extended_resources(self, version):
+        if version == "2.0":
+            return EXTENDED_ATTRIBUTES_2_0
+        else:
+            return {}
+
+
+class QueuePluginBase(object):
+    @abstractmethod
+    def create_qos_queue(self, context, queue):
+        pass
+
+    @abstractmethod
+    def delete_qos_queue(self, context, id):
+        pass
+
+    @abstractmethod
+    def get_qos_queue(self, context, id, fields=None):
+        pass
+
+    @abstractmethod
+    def get_qos_queues(self, context, filters=None, fields=None):
+        pass
diff --git a/quantum/plugins/nicira/nicira_nvp_plugin/nicira_qos_db.py b/quantum/plugins/nicira/nicira_nvp_plugin/nicira_qos_db.py
new file mode 100644 (file)
index 0000000..c794124
--- /dev/null
@@ -0,0 +1,300 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 Nicira Networks, Inc.  All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Aaron Rosen, Nicira, Inc
+
+import sqlalchemy as sa
+from sqlalchemy.orm import exc
+
+from quantum.api.v2 import attributes as attr
+from quantum.db import model_base
+from quantum.db import models_v2
+from quantum.openstack.common import uuidutils
+from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
+                                                                 as ext_qos)
+
+
+class QoSQueue(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
+    name = sa.Column(sa.String(255))
+    default = sa.Column(sa.Boolean, default=False)
+    min = sa.Column(sa.Integer, nullable=False)
+    max = sa.Column(sa.Integer, nullable=True)
+    qos_marking = sa.Column(sa.Enum('untrusted', 'trusted',
+                                    name='qosqueues_qos_marking'))
+    dscp = sa.Column(sa.Integer)
+
+
+class PortQueueMapping(model_base.BASEV2):
+    port_id = sa.Column(sa.String(36),
+                        sa.ForeignKey("ports.id", ondelete="CASCADE"),
+                        primary_key=True)
+
+    queue_id = sa.Column(sa.String(36), sa.ForeignKey("qosqueues.id"),
+                         primary_key=True)
+
+
+class NetworkQueueMapping(model_base.BASEV2):
+    network_id = sa.Column(sa.String(36),
+                           sa.ForeignKey("networks.id", ondelete="CASCADE"),
+                           primary_key=True)
+
+    queue_id = sa.Column(sa.String(36), sa.ForeignKey("qosqueues.id",
+                                                      ondelete="CASCADE"))
+
+
+class NVPQoSDbMixin(ext_qos.QueuePluginBase):
+    """Mixin class to add queues."""
+
+    def create_qos_queue(self, context, qos_queue):
+        q = qos_queue['qos_queue']
+        with context.session.begin(subtransactions=True):
+            qos_queue = QoSQueue(id=q.get('id', uuidutils.generate_uuid()),
+                                 name=q.get('name'),
+                                 tenant_id=q['tenant_id'],
+                                 default=q.get('default'),
+                                 min=q.get('min'),
+                                 max=q.get('max'),
+                                 qos_marking=q.get('qos_marking'),
+                                 dscp=q.get('dscp'))
+            context.session.add(qos_queue)
+        return self._make_qos_queue_dict(qos_queue)
+
+    def get_qos_queue(self, context, id, fields=None):
+        return self._make_qos_queue_dict(
+            self._get_qos_queue(context, id), fields)
+
+    def _get_qos_queue(self, context, id):
+        try:
+            return self._get_by_id(context, QoSQueue, id)
+        except exc.NoResultFound:
+            raise ext_qos.QueueNotFound(id=id)
+
+    def get_qos_queues(self, context, filters=None, fields=None):
+        return self._get_collection(context, QoSQueue,
+                                    self._make_qos_queue_dict,
+                                    filters=filters, fields=fields)
+
+    def delete_qos_queue(self, context, id):
+        qos_queue = self._get_qos_queue(context, id)
+        with context.session.begin(subtransactions=True):
+            context.session.delete(qos_queue)
+
+    def _process_port_queue_mapping(self, context, p):
+        if not p.get(ext_qos.QUEUE):
+            return
+        with context.session.begin(subtransactions=True):
+            db = PortQueueMapping(port_id=p['id'],
+                                  queue_id=p.get(ext_qos.QUEUE))
+            context.session.add(db)
+
+    def _get_port_queue_bindings(self, context, filters=None, fields=None):
+        return self._get_collection(context, PortQueueMapping,
+                                    self._make_port_queue_binding_dict,
+                                    filters=filters, fields=fields)
+
+    def _delete_port_queue_mapping(self, context, port_id):
+        query = self._model_query(context, PortQueueMapping)
+        try:
+            binding = query.filter(PortQueueMapping.port_id == port_id).one()
+        except exc.NoResultFound:
+            # return since this can happen if we are updating a port that
+            # did not already have a queue on it. There is no need to check
+            # if there is one before deleting if we return here.
+            return
+        with context.session.begin(subtransactions=True):
+            context.session.delete(binding)
+
+    def _process_network_queue_mapping(self, context, network):
+        if not network.get(ext_qos.QUEUE):
+            return
+        with context.session.begin(subtransactions=True):
+            db = NetworkQueueMapping(network_id=network['id'],
+                                     queue_id=network.get(ext_qos.QUEUE))
+            context.session.add(db)
+
+    def _get_network_queue_bindings(self, context, filters=None, fields=None):
+        return self._get_collection(context, NetworkQueueMapping,
+                                    self._make_network_queue_binding_dict,
+                                    filters=filters, fields=fields)
+
+    def _delete_network_queue_mapping(self, context, network_id):
+        query = self._model_query(context, NetworkQueueMapping)
+        try:
+            with context.session.begin(subtransactions=True):
+                binding = query.filter_by(network_id=network_id).first()
+                if binding:
+                    context.session.delete(binding)
+        except exc.NoResultFound:
+            # return since this can happen if we are updating a port that
+            # did not already have a queue on it. There is no need to check
+            # if there is one before deleting if we return here.
+            return
+
+    def _extend_port_qos_queue(self, context, port):
+        if self._check_view_auth(context, {'qos_queue': None},
+                                 ext_qos.qos_queue_get):
+            filters = {'port_id': [port['id']]}
+            fields = ['queue_id']
+            port[ext_qos.QUEUE] = None
+            queue_id = self._get_port_queue_bindings(
+                context, filters, fields)
+            if queue_id:
+                port[ext_qos.QUEUE] = queue_id[0]['queue_id']
+        return port
+
+    def _extend_network_qos_queue(self, context, network):
+        if self._check_view_auth(context, {'qos_queue': None},
+                                 ext_qos.qos_queue_get):
+            filters = {'network_id': [network['id']]}
+            fields = ['queue_id']
+            network[ext_qos.QUEUE] = None
+            queue_id = self._get_network_queue_bindings(
+                context, filters, fields)
+            if queue_id:
+                network[ext_qos.QUEUE] = queue_id[0]['queue_id']
+        return network
+
+    def _make_qos_queue_dict(self, queue, fields=None):
+        res = {'id': queue['id'],
+               'name': queue.get('name'),
+               'default': queue.get('default'),
+               'tenant_id': queue['tenant_id'],
+               'min': queue.get('min'),
+               'max': queue.get('max'),
+               'qos_marking': queue.get('qos_marking'),
+               'dscp': queue.get('dscp')}
+        return self._fields(res, fields)
+
+    def _make_port_queue_binding_dict(self, queue, fields=None):
+        res = {'port_id': queue['port_id'],
+               'queue_id': queue['queue_id']}
+        return self._fields(res, fields)
+
+    def _make_network_queue_binding_dict(self, queue, fields=None):
+        res = {'network_id': queue['network_id'],
+               'queue_id': queue['queue_id']}
+        return self._fields(res, fields)
+
+    def _check_for_queue_and_create(self, context, port):
+        """This function determines if a port should be associated with a
+        queue. It works by first querying NetworkQueueMapping to determine
+        if the network is associated with a queue. If so, then it queries
+        NetworkQueueMapping for all the networks that are associated with
+        this queue. Next, it queries against all the ports on these networks
+        with the port device_id. Finally it queries PortQueueMapping. If that
+        query returns a queue_id that is returned. Otherwise a queue is
+        created that is the size of the queue associated with the network and
+        that queue_id is returned.
+
+        If the network is not associated with a queue we then query to see
+        if there is a default queue in the system. If so, a copy of that is
+        created and the queue_id is returned.
+
+        Otherwise None is returned. None is also returned if the port does not
+        have a device_id or if the device_owner is network:
+        """
+
+        queue_to_create = None
+        # If there is no device_id don't create a queue. The queue will be
+        # created on update port when the device_id is present. Also don't
+        # apply QoS to network ports.
+        if (not port.get('device_id') or
+            port['device_owner'].startswith('network:')):
+            return
+
+        # Check if there is a queue assocated with the network
+        filters = {'network_id': [port['network_id']]}
+        network_queue_id = self._get_network_queue_bindings(
+            context, filters, ['queue_id'])
+
+        if network_queue_id:
+            # get networks that queue is assocated with
+            filters = {'queue_id': [network_queue_id[0]['queue_id']]}
+            networks_with_same_queue = self._get_network_queue_bindings(
+                context, filters)
+
+            # get the ports on these networks with the same_queue and device_id
+            filters = {'device_id': [port.get('device_id')],
+                       'network_id': [network['network_id'] for
+                                      network in networks_with_same_queue]}
+            query = self._model_query(context, models_v2.Port)
+            ports = self._apply_filters_to_query(query, models_v2.Port,
+                                                 filters).all()
+
+            if ports:
+                # shared queue already exists find the queue id
+                filters = {'port_id': [p['id'] for p in ports]}
+                queues = self._get_port_queue_bindings(context, filters,
+                                                       ['queue_id'])
+                if queues:
+                    return queues[0]['queue_id']
+
+            # get the size of the queue we want to create
+            queue_to_create = self._get_qos_queue(
+                context, network_queue_id[0]['queue_id'])
+
+        else:
+            # check for default queue
+            filters = {'default': [True]}
+            # context is elevated since default queue is owned by admin
+            queue_to_create = self.get_qos_queues(context.elevated(), filters)
+            if not queue_to_create:
+                return
+            queue_to_create = queue_to_create[0]
+
+        # create the queue
+        tenant_id = self._get_tenant_id_for_create(context, port)
+        if port.get(ext_qos.RXTX_FACTOR) and queue_to_create.get('max'):
+            queue_to_create['max'] *= int(port[ext_qos.RXTX_FACTOR])
+        queue = {'qos_queue': {'name': queue_to_create.get('name'),
+                               'min': queue_to_create.get('min'),
+                               'max': queue_to_create.get('max'),
+                               'dscp': queue_to_create.get('dscp'),
+                               'qos_marking':
+                               queue_to_create.get('qos_marking'),
+                               'tenant_id': tenant_id}}
+        return self.create_qos_queue(context, queue, False)['id']
+
+    def _validate_qos_queue(self, context, qos_queue):
+        if qos_queue.get('default'):
+            if context.is_admin:
+                if self.get_qos_queues(context, filters={'default': [True]}):
+                    raise ext_qos.DefaultQueueAlreadyExists()
+            else:
+                raise ext_qos.DefaultQueueCreateNotAdmin()
+        if (qos_queue.get('qos_marking') == 'trusted' and
+            not qos_queue.get('dscp')):
+            raise ext_qos.MissingDSCPForTrusted()
+        max = qos_queue.get('max')
+        min = qos_queue.get('min')
+        # Max can be None
+        if max and min > max:
+            raise ext_qos.QueueMinGreaterMax()
+
+    def _nvp_lqueue(self, queue):
+        """Convert fields to nvp fields."""
+        nvp_queue = {}
+        params = {'name': 'display_name',
+                  'qos_marking': 'qos_marking',
+                  'min': 'min_bandwidth_rate',
+                  'max': 'max_bandwidth_rate',
+                  'dscp': 'dscp'}
+        nvp_queue = dict(
+            (nvp_name, queue.get(api_name))
+            for api_name, nvp_name in params.iteritems()
+            if attr.is_attr_set(queue.get(api_name))
+        )
+        return nvp_queue
index ef8eec9cd7bd65e859cf6b8d025a8613b062c609..2732d6b0bcd38360149f1c739442ce49b0e09d9c 100644 (file)
@@ -50,6 +50,9 @@ LSWITCHPORT_RESOURCE = "lport-%s" % LSWITCH_RESOURCE
 LROUTER_RESOURCE = "lrouter"
 LROUTERPORT_RESOURCE = "lport-%s" % LROUTER_RESOURCE
 LROUTERNAT_RESOURCE = "nat-lrouter"
+LQUEUE_RESOURCE = "lqueue"
+# Current quantum version
+QUANTUM_VERSION = "2013.1"
 
 # Constants for NAT rules
 MATCH_KEYS = ["destination_ip_addresses", "destination_port_max",
@@ -573,7 +576,7 @@ def get_port(cluster, network, port, relations=None):
 
 
 def _configure_extensions(lport_obj, mac_address, fixed_ips,
-                          port_security_enabled, security_profiles):
+                          port_security_enabled, security_profiles, queue_id):
     lport_obj['allowed_address_pairs'] = []
     if port_security_enabled:
         for fixed_ip in fixed_ips:
@@ -587,12 +590,13 @@ def _configure_extensions(lport_obj, mac_address, fixed_ips,
             {"mac_address": mac_address,
              "ip_address": "0.0.0.0"})
     lport_obj['security_profiles'] = list(security_profiles or [])
+    lport_obj['queue_uuid'] = queue_id
 
 
 def update_port(cluster, lswitch_uuid, lport_uuid, quantum_port_id, tenant_id,
                 display_name, device_id, admin_status_enabled,
                 mac_address=None, fixed_ips=None, port_security_enabled=None,
-                security_profiles=None):
+                security_profiles=None, queue_id=None):
 
     # device_id can be longer than 40 so we rehash it
     hashed_device_id = hashlib.sha1(device_id).hexdigest()
@@ -604,7 +608,8 @@ def update_port(cluster, lswitch_uuid, lport_uuid, quantum_port_id, tenant_id,
               dict(scope='vm_id', tag=hashed_device_id)])
 
     _configure_extensions(lport_obj, mac_address, fixed_ips,
-                          port_security_enabled, security_profiles)
+                          port_security_enabled, security_profiles,
+                          queue_id)
 
     path = "/ws.v1/lswitch/" + lswitch_uuid + "/lport/" + lport_uuid
     try:
@@ -624,7 +629,7 @@ def update_port(cluster, lswitch_uuid, lport_uuid, quantum_port_id, tenant_id,
 def create_lport(cluster, lswitch_uuid, tenant_id, quantum_port_id,
                  display_name, device_id, admin_status_enabled,
                  mac_address=None, fixed_ips=None, port_security_enabled=None,
-                 security_profiles=None):
+                 security_profiles=None, queue_id=None):
     """ Creates a logical port on the assigned logical switch """
     # device_id can be longer than 40 so we rehash it
     hashed_device_id = hashlib.sha1(device_id).hexdigest()
@@ -637,7 +642,8 @@ def create_lport(cluster, lswitch_uuid, tenant_id, quantum_port_id,
     )
 
     _configure_extensions(lport_obj, mac_address, fixed_ips,
-                          port_security_enabled, security_profiles)
+                          port_security_enabled, security_profiles,
+                          queue_id)
 
     path = _build_uri_path(LSWITCHPORT_RESOURCE,
                            parent_resource_id=lswitch_uuid)
@@ -1184,3 +1190,29 @@ NVPLIB_FUNC_DICT = {
     'create_lrouter_snat_rule': {2: create_lrouter_snat_rule_v2,
                                  3: create_lrouter_snat_rule_v3}
 }
+
+
+# -----------------------------------------------------------------------------
+# QOS API Calls
+# -----------------------------------------------------------------------------
+def create_lqueue(cluster, lqueue):
+    uri = _build_uri_path(LQUEUE_RESOURCE)
+    lqueue['tags'] = [{'tag': QUANTUM_VERSION, 'scope': 'quantum'}]
+    try:
+        resp_obj = do_single_request(HTTP_POST, uri, json.dumps(lqueue),
+                                     cluster=cluster)
+    except NvpApiClient.NvpApiException:
+        LOG.exception(_("Failed to create logical queue"))
+        raise exception.QuantumException()
+    return json.loads(resp_obj)['uuid']
+
+
+def delete_lqueue(cluster, id):
+    try:
+        do_single_request(HTTP_DELETE,
+                          _build_uri_path(LQUEUE_RESOURCE,
+                                          resource_id=id),
+                          cluster=cluster)
+    except Exception:
+        LOG.exception(_("Failed to delete logical queue"))
+        raise exception.QuantumException()
diff --git a/quantum/tests/unit/nicira/etc/fake_post_lqueue.json b/quantum/tests/unit/nicira/etc/fake_post_lqueue.json
new file mode 100644 (file)
index 0000000..414945b
--- /dev/null
@@ -0,0 +1,11 @@
+{
+   "display_name": "%(display_name)s",
+   "uuid": "%(uuid)s",
+   "type": "LogicalSwitchConfig",
+   "_schema": "/ws.v1/schema/LogicalQueueConfig",
+   "dscp": "%(dscp)s",
+   "max_bandwidth_rate": "%(max_bandwidth_rate)s",
+   "min_bandwidth_rate": "%(min_bandwidth_rate)s",
+   "qos_marking": "%(qos_marking)s",
+   "_href": "/ws.v1/lqueue/%(uuid)s"
+}
index 893a6128616d896f8d2978ee1dc10ef7ba1cb664..bbbae9f15e1966f7b4f650e2511d221360b96a47 100644 (file)
@@ -30,6 +30,7 @@ class FakeClient:
     LPORT_RESOURCE = 'lport'
     LROUTER_RESOURCE = 'lrouter'
     NAT_RESOURCE = 'nat'
+    LQUEUE_RESOURCE = 'lqueue'
     SECPROF_RESOURCE = 'securityprofile'
     LSWITCH_STATUS = 'lswitchstatus'
     LROUTER_STATUS = 'lrouterstatus'
@@ -41,7 +42,7 @@ class FakeClient:
     LROUTER_LPORT_STATUS = 'lrouter_lportstatus'
     LROUTER_LPORT_ATT = 'lrouter_lportattachment'
 
-    RESOURCES = [LSWITCH_RESOURCE, LROUTER_RESOURCE,
+    RESOURCES = [LSWITCH_RESOURCE, LROUTER_RESOURCE, LQUEUE_RESOURCE,
                  LPORT_RESOURCE, NAT_RESOURCE, SECPROF_RESOURCE]
 
     FAKE_GET_RESPONSES = {
@@ -63,7 +64,8 @@ class FakeClient:
         LSWITCH_LPORT_RESOURCE: "fake_post_lswitch_lport.json",
         LROUTER_LPORT_RESOURCE: "fake_post_lrouter_lport.json",
         LROUTER_NAT_RESOURCE: "fake_post_lrouter_nat.json",
-        SECPROF_RESOURCE: "fake_post_security_profile.json"
+        SECPROF_RESOURCE: "fake_post_security_profile.json",
+        LQUEUE_RESOURCE: "fake_post_lqueue.json"
     }
 
     FAKE_PUT_RESPONSES = {
@@ -74,7 +76,8 @@ class FakeClient:
         LROUTER_NAT_RESOURCE: "fake_post_lrouter_nat.json",
         LSWITCH_LPORT_ATT: "fake_put_lswitch_lport_att.json",
         LROUTER_LPORT_ATT: "fake_put_lrouter_lport_att.json",
-        SECPROF_RESOURCE: "fake_post_security_profile.json"
+        SECPROF_RESOURCE: "fake_post_security_profile.json",
+        LQUEUE_RESOURCE: "fake_post_lqueue.json"
     }
 
     MANAGED_RELATIONS = {
@@ -92,6 +95,7 @@ class FakeClient:
     _fake_lswitch_lportstatus_dict = {}
     _fake_lrouter_lportstatus_dict = {}
     _fake_securityprofile_dict = {}
+    _fake_lqueue_dict = {}
 
     def __init__(self, fake_files_path):
         self.fake_files_path = fake_files_path
@@ -138,6 +142,12 @@ class FakeClient:
             'gateway_ip_address', '0.0.0.0')
         return fake_lrouter
 
+    def _add_lqueue(self, body):
+        fake_lqueue = json.loads(body)
+        fake_lqueue['uuid'] = uuidutils.generate_uuid()
+        self._fake_lqueue_dict[fake_lqueue['uuid']] = fake_lqueue
+        return fake_lqueue
+
     def _add_lswitch_lport(self, body, ls_uuid):
         fake_lport = json.loads(body)
         new_uuid = uuidutils.generate_uuid()
@@ -251,6 +261,7 @@ class FakeClient:
         /ws.v1/lrouter/zzz/status
         /ws.v1/lrouter/zzz/lport/www
         /ws.v1/lrouter/zzz/lport/www/status
+        /ws.v1/lqueue/xxx
         """
         # The first element will always be 'ws.v1' - so we just discard it
         uri_split = path.split('/')[1:]
@@ -487,3 +498,4 @@ class FakeClient:
         self._fake_lrouter_lport_dict.clear()
         self._fake_lswitch_lportstatus_dict.clear()
         self._fake_lrouter_lportstatus_dict.clear()
+        self._fake_lqueue_dict.clear()
index b51428a12131d08acf248ef0518e3c22918dda9e..94375c59bf57872307e1d528066bdfd9bc013637 100644 (file)
@@ -26,8 +26,11 @@ from quantum.extensions import providernet as pnet
 from quantum.extensions import securitygroup as secgrp
 from quantum import manager
 from quantum.openstack.common import cfg
+from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
+                                                                 as ext_qos)
 from quantum.plugins.nicira.nicira_nvp_plugin import nvplib
 from quantum.tests.unit.nicira import fake_nvpapiclient
+from quantum.tests.unit import test_extensions
 import quantum.tests.unit.test_db_plugin as test_plugin
 import quantum.tests.unit.test_extension_portsecurity as psec
 import quantum.tests.unit.test_extension_security_group as ext_sg
@@ -35,6 +38,7 @@ import quantum.tests.unit.test_l3_plugin as test_l3_plugin
 
 LOG = logging.getLogger(__name__)
 NICIRA_PKG_PATH = 'quantum.plugins.nicira.nicira_nvp_plugin'
+NICIRA_EXT_PATH = "../../plugins/nicira/nicira_nvp_plugin/extensions"
 
 
 class NiciraPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
@@ -149,11 +153,11 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
     def _test_create_bridge_network(self, vlan_id=None):
         net_type = vlan_id and 'vlan' or 'flat'
         name = 'bridge_net'
-        keys = [('subnets', []), ('name', name), ('admin_state_up', True),
-                ('status', 'ACTIVE'), ('shared', False),
-                (pnet.NETWORK_TYPE, net_type),
-                (pnet.PHYSICAL_NETWORK, 'tzuuid'),
-                (pnet.SEGMENTATION_ID, vlan_id)]
+        expected = [('subnets', []), ('name', name), ('admin_state_up', True),
+                    ('status', 'ACTIVE'), ('shared', False),
+                    (pnet.NETWORK_TYPE, net_type),
+                    (pnet.PHYSICAL_NETWORK, 'tzuuid'),
+                    (pnet.SEGMENTATION_ID, vlan_id)]
         providernet_args = {pnet.NETWORK_TYPE: net_type,
                             pnet.PHYSICAL_NETWORK: 'tzuuid'}
         if vlan_id:
@@ -163,8 +167,8 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
                           arg_list=(pnet.NETWORK_TYPE,
                                     pnet.PHYSICAL_NETWORK,
                                     pnet.SEGMENTATION_ID)) as net:
-            for k, v in keys:
-                self.assertEquals(net['network'][k], v)
+            for k, v in expected:
+                self.assertEqual(net['network'][k], v)
 
     def test_create_bridge_network(self):
         self._test_create_bridge_network()
@@ -175,7 +179,7 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
     def test_create_bridge_vlan_network_outofrange_returns_400(self):
         with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
             self._test_create_bridge_network(vlan_id=5000)
-        self.assertEquals(ctx_manager.exception.code, 400)
+        self.assertEqual(ctx_manager.exception.code, 400)
 
     def test_list_networks_filter_by_id(self):
         # We add this unit test to cover some logic specific to the
@@ -259,3 +263,230 @@ class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
             self._test_floatingip_with_assoc_fails(
                 'quantum.plugins.nicira.nicira_nvp_plugin.'
                 'QuantumPlugin.NvpPluginV2')
+
+
+class NvpQoSTestExtensionManager(object):
+
+    def get_resources(self):
+        return ext_qos.Nvp_qos.get_resources()
+
+    def get_actions(self):
+        return []
+
+    def get_request_extensions(self):
+        return []
+
+
+class TestNiciraQoSQueue(NiciraPluginV2TestCase):
+
+    def setUp(self, plugin=None):
+        ext_path = os.path.join(os.path.dirname(os.path.dirname(__file__)),
+                                NICIRA_EXT_PATH)
+        cfg.CONF.set_override('api_extensions_path', ext_path)
+        super(TestNiciraQoSQueue, self).setUp()
+        ext_mgr = NvpQoSTestExtensionManager()
+        self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+
+    def _create_qos_queue(self, fmt, body, **kwargs):
+        qos_queue = self.new_create_request('qos-queues', body)
+        if (kwargs.get('set_context') and 'tenant_id' in kwargs):
+            # create a specific auth context for this request
+            qos_queue.environ['quantum.context'] = context.Context(
+                '', kwargs['tenant_id'])
+
+        return qos_queue.get_response(self.ext_api)
+
+    @contextlib.contextmanager
+    def qos_queue(self, name='foo', min='0', max='10',
+                  qos_marking=None, dscp='0', default=None, no_delete=False):
+
+        body = {'qos_queue': {'tenant_id': 'tenant',
+                              'name': name,
+                              'min': min,
+                              'max': max}}
+
+        if qos_marking:
+            body['qos_queue']['qos_marking'] = qos_marking
+        if dscp:
+            body['qos_queue']['dscp'] = dscp
+        if default:
+            body['qos_queue']['default'] = default
+
+        res = self._create_qos_queue('json', body)
+        qos_queue = self.deserialize('json', res)
+        if res.status_int >= 400:
+            raise webob.exc.HTTPClientError(code=res.status_int)
+        try:
+            yield qos_queue
+        finally:
+            if not no_delete:
+                self._delete('qos-queues',
+                             qos_queue['qos_queue']['id'])
+
+    def test_create_qos_queue(self):
+        with self.qos_queue(name='fake_lqueue', min=34, max=44,
+                            qos_marking='untrusted', default=False) as q:
+            self.assertEqual(q['qos_queue']['name'], 'fake_lqueue')
+            self.assertEqual(q['qos_queue']['min'], 34)
+            self.assertEqual(q['qos_queue']['max'], 44)
+            self.assertEqual(q['qos_queue']['qos_marking'], 'untrusted')
+            self.assertFalse(q['qos_queue']['default'])
+
+    def test_create_qos_queue_default(self):
+        with self.qos_queue(default=True) as q:
+            self.assertTrue(q['qos_queue']['default'])
+
+    def test_create_qos_queue_two_default_queues_fail(self):
+        with self.qos_queue(default=True):
+            body = {'qos_queue': {'tenant_id': 'tenant',
+                                  'name': 'second_default_queue',
+                                  'default': True}}
+            res = self._create_qos_queue('json', body)
+            self.assertEqual(res.status_int, 409)
+
+    def test_create_port_with_queue(self):
+        with self.qos_queue(default=True) as q1:
+            res = self._create_network('json', 'net1', True,
+                                       arg_list=(ext_qos.QUEUE,),
+                                       queue_id=q1['qos_queue']['id'])
+            net1 = self.deserialize('json', res)
+            self.assertEqual(net1['network'][ext_qos.QUEUE],
+                             q1['qos_queue']['id'])
+            device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+            with self.port(device_id=device_id, do_delete=False) as p:
+                self.assertEqual(len(p['port'][ext_qos.QUEUE]), 36)
+
+    def test_create_shared_queue_networks(self):
+        with self.qos_queue(default=True, no_delete=True) as q1:
+            res = self._create_network('json', 'net1', True,
+                                       arg_list=(ext_qos.QUEUE,),
+                                       queue_id=q1['qos_queue']['id'])
+            net1 = self.deserialize('json', res)
+            self.assertEqual(net1['network'][ext_qos.QUEUE],
+                             q1['qos_queue']['id'])
+            res = self._create_network('json', 'net2', True,
+                                       arg_list=(ext_qos.QUEUE,),
+                                       queue_id=q1['qos_queue']['id'])
+            net2 = self.deserialize('json', res)
+            self.assertEqual(net1['network'][ext_qos.QUEUE],
+                             q1['qos_queue']['id'])
+            device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+            res = self._create_port('json', net1['network']['id'],
+                                    device_id=device_id)
+            port1 = self.deserialize('json', res)
+            res = self._create_port('json', net2['network']['id'],
+                                    device_id=device_id)
+            port2 = self.deserialize('json', res)
+            self.assertEqual(port1['port'][ext_qos.QUEUE],
+                             port2['port'][ext_qos.QUEUE])
+
+            self._delete('ports', port1['port']['id'])
+            self._delete('ports', port2['port']['id'])
+
+    def test_remove_queue_in_use_fail(self):
+        with self.qos_queue(no_delete=True) as q1:
+            res = self._create_network('json', 'net1', True,
+                                       arg_list=(ext_qos.QUEUE,),
+                                       queue_id=q1['qos_queue']['id'])
+            net1 = self.deserialize('json', res)
+            device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+            res = self._create_port('json', net1['network']['id'],
+                                    device_id=device_id)
+            port = self.deserialize('json', res)
+            self._delete('qos-queues', port['port'][ext_qos.QUEUE], 409)
+
+    def test_update_network_new_queue(self):
+        with self.qos_queue() as q1:
+            res = self._create_network('json', 'net1', True,
+                                       arg_list=(ext_qos.QUEUE,),
+                                       queue_id=q1['qos_queue']['id'])
+            net1 = self.deserialize('json', res)
+            with self.qos_queue() as new_q:
+                data = {'network': {ext_qos.QUEUE: new_q['qos_queue']['id']}}
+                req = self.new_update_request('networks', data,
+                                              net1['network']['id'])
+                res = req.get_response(self.api)
+                net1 = self.deserialize('json', res)
+                self.assertEqual(net1['network'][ext_qos.QUEUE],
+                                 new_q['qos_queue']['id'])
+
+    def test_update_port_adding_device_id(self):
+        with self.qos_queue(no_delete=True) as q1:
+            res = self._create_network('json', 'net1', True,
+                                       arg_list=(ext_qos.QUEUE,),
+                                       queue_id=q1['qos_queue']['id'])
+            net1 = self.deserialize('json', res)
+            device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+            res = self._create_port('json', net1['network']['id'])
+            port = self.deserialize('json', res)
+            self.assertEqual(port['port'][ext_qos.QUEUE], None)
+
+            data = {'port': {'device_id': device_id}}
+            req = self.new_update_request('ports', data,
+                                          port['port']['id'])
+
+            res = req.get_response(self.api)
+            port = self.deserialize('json', res)
+            self.assertEqual(len(port['port'][ext_qos.QUEUE]), 36)
+
+    def test_get_port_with_qos_not_admin(self):
+        body = {'qos_queue': {'tenant_id': 'not_admin',
+                              'name': 'foo', 'min': 20, 'max': 20}}
+        res = self._create_qos_queue('json', body, tenant_id='not_admin')
+        q1 = self.deserialize('json', res)
+        res = self._create_network('json', 'net1', True,
+                                   arg_list=(ext_qos.QUEUE, 'tenant_id',),
+                                   queue_id=q1['qos_queue']['id'],
+                                   tenant_id="not_admin")
+        net1 = self.deserialize('json', res)
+        self.assertEqual(len(net1['network'][ext_qos.QUEUE]), 36)
+        res = self._create_port('json', net1['network']['id'],
+                                tenant_id='not_admin', set_context=True)
+
+        port = self.deserialize('json', res)
+        self.assertEqual(ext_qos.QUEUE not in port['port'], True)
+
+    def test_non_admin_cannot_create_queue(self):
+        body = {'qos_queue': {'tenant_id': 'not_admin',
+                              'name': 'foo', 'min': 20, 'max': 20}}
+        res = self._create_qos_queue('json', body, tenant_id='not_admin',
+                                     set_context=True)
+        self.assertEqual(res.status_int, 403)
+
+    def test_update_port_non_admin_does_not_show_queue_id(self):
+        body = {'qos_queue': {'tenant_id': 'not_admin',
+                              'name': 'foo', 'min': 20, 'max': 20}}
+        res = self._create_qos_queue('json', body, tenant_id='not_admin')
+        q1 = self.deserialize('json', res)
+        res = self._create_network('json', 'net1', True,
+                                   arg_list=(ext_qos.QUEUE,),
+                                   tenant_id='not_admin',
+                                   queue_id=q1['qos_queue']['id'])
+
+        net1 = self.deserialize('json', res)
+        res = self._create_port('json', net1['network']['id'],
+                                tenant_id='not_admin', set_context=True)
+        port = self.deserialize('json', res)
+        device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+        data = {'port': {'device_id': device_id}}
+        quantum_context = context.Context('', 'not_admin')
+        port = self._update('ports', port['port']['id'], data,
+                            quantum_context=quantum_context)
+        self.assertEqual(ext_qos.QUEUE not in port['port'], True)
+
+    def test_rxtx_factor(self):
+        with self.qos_queue(max=10) as q1:
+
+            res = self._create_network('json', 'net1', True,
+                                       arg_list=(ext_qos.QUEUE,),
+                                       queue_id=q1['qos_queue']['id'])
+            net1 = self.deserialize('json', res)
+            res = self._create_port('json', net1['network']['id'],
+                                    arg_list=(ext_qos.RXTX_FACTOR,),
+                                    rxtx_factor=2, device_id='1')
+            port = self.deserialize('json', res)
+            req = self.new_show_request('qos-queues',
+                                        port['port'][ext_qos.QUEUE])
+            res = req.get_response(self.ext_api)
+            queue = self.deserialize('json', res)
+            self.assertEqual(queue['qos_queue']['max'], 20)