"create_service_type": "rule:admin_only",
"update_service_type": "rule:admin_only",
"delete_service_type": "rule:admin_only",
- "get_service_type": "rule:regular_user"
+ "get_service_type": "rule:regular_user",
+
+ "create_qos_queue:": "rule:admin_only",
+ "get_qos_queue:": "rule:admin_only",
+ "get_qos_queues:": "rule:admin_only"
}
from quantum.plugins.nicira.nicira_nvp_plugin.common import config
from quantum.plugins.nicira.nicira_nvp_plugin.common import (exceptions
as nvp_exc)
+from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
+ as ext_qos)
from quantum.plugins.nicira.nicira_nvp_plugin import nicira_db
from quantum.plugins.nicira.nicira_nvp_plugin import NvpApiClient
from quantum.plugins.nicira.nicira_nvp_plugin import nvplib
from quantum.plugins.nicira.nicira_nvp_plugin import nvp_cluster
from quantum.plugins.nicira.nicira_nvp_plugin.nvp_plugin_version import (
PLUGIN_VERSION)
-
+from quantum.plugins.nicira.nicira_nvp_plugin import nicira_qos_db as qos_db
LOG = logging.getLogger("QuantumPlugin")
NVP_FLOATINGIP_NAT_RULES_ORDER = 200
NVP_EXTGW_NAT_RULES_ORDER = 255
l3_db.L3_NAT_db_mixin,
portsecurity_db.PortSecurityDbMixin,
securitygroups_db.SecurityGroupDbMixin,
- nvp_sec.NVPSecurityGroups):
+ nvp_sec.NVPSecurityGroups, qos_db.NVPQoSDbMixin):
"""
NvpPluginV2 is a Quantum plugin that provides L2 Virtual Network
functionality using NVP.
"""
supported_extension_aliases = ["provider", "quotas", "port-security",
- "router", "security-group"]
+ "router", "security-group", "nvp-qos"]
__native_bulk_support = True
# Default controller cluster
port_data['mac_address'],
port_data['fixed_ips'],
port_data[psec.PORTSECURITY],
- port_data[ext_sg.SECURITYGROUPS])
+ port_data[ext_sg.SECURITYGROUPS],
+ port_data[ext_qos.QUEUE])
nicira_db.add_quantum_nvp_port_mapping(
context.session, port_data['id'], lport['uuid'])
d_owner = port_data['device_owner']
network)
# Ensure there's an id in net_data
net_data['id'] = new_net['id']
+ # Process port security extension
self._process_network_create_port_security(context, net_data)
# DB Operations for setting the network as external
self._process_l3_create(context, net_data, new_net['id'])
+ # Process QoS queue extension
+ if network['network'].get(ext_qos.QUEUE):
+ new_net[ext_qos.QUEUE] = network['network'][ext_qos.QUEUE]
+ # Raises if not found
+ self.get_qos_queue(context, new_net[ext_qos.QUEUE])
+ self._process_network_queue_mapping(context, new_net)
+ self._extend_network_qos_queue(context, new_net)
+
if net_data.get(pnet.NETWORK_TYPE):
net_binding = nicira_db.add_network_binding(
context.session, new_net['id'],
return pairs
def get_network(self, context, id, fields=None):
- """
- Retrieves all attributes of the network, NOT including
- the ports of that network.
-
- :returns: a sequence of mappings with the following signature:
- {'id': UUID representing the network.
- 'name': Human-readable name identifying the network.
- 'tenant_id': Owner of network. only admin user
- can specify a tenant_id other than its own.
- 'admin_state_up': Sets admin state of network. if down,
- network does not forward packets.
- 'status': Indicates whether network is currently
- operational (limit values to "ACTIVE", "DOWN",
- "BUILD", and "ERROR"?
- 'subnets': Subnets associated with this network. Plan
- to allow fully specified subnets as part of
- network create.
- }
-
- :raises: exception.NetworkNotFound
- :raises: exception.QuantumException
- """
with context.session.begin(subtransactions=True):
# goto to the plugin DB and fetch the network
network = self._get_network(context, id)
self._extend_network_dict_provider(context, net_result)
self._extend_network_port_security_dict(context, net_result)
self._extend_network_dict_l3(context, net_result)
+ self._extend_network_qos_queue(context, net_result)
return self._fields(net_result, fields)
def get_networks(self, context, filters=None, fields=None):
self._extend_network_dict_provider(context, net)
self._extend_network_port_security_dict(context, net)
self._extend_network_dict_l3(context, net)
+ self._extend_network_qos_queue(context, net)
+
quantum_lswitches = self._filter_nets_l3(context,
quantum_lswitches,
filters)
if psec.PORTSECURITY in network['network']:
self._update_network_security_binding(
context, id, network['network'][psec.PORTSECURITY])
+ if network['network'].get(ext_qos.QUEUE):
+ net[ext_qos.QUEUE] = network['network'][ext_qos.QUEUE]
+ self._delete_network_queue_mapping(context, id)
+ self._process_network_queue_mapping(context, net)
self._extend_network_port_security_dict(context, net)
self._process_l3_update(context, network['network'], id)
self._extend_network_dict_provider(context, net)
self._extend_network_dict_l3(context, net)
+ self._extend_network_qos_queue(context, net)
return net
def get_ports(self, context, filters=None, fields=None):
self._get_security_groups_on_port(context, port))
self._process_port_create_security_group(
context, quantum_db['id'], port_data[ext_sg.SECURITYGROUPS])
+ # QoS extension checks
+ port_data[ext_qos.QUEUE] = self._check_for_queue_and_create(
+ context, port_data)
+ self._process_port_queue_mapping(context, port_data)
# provider networking extension checks
# Fetch the network and network binding from Quantum db
try:
LOG.debug(_("create_port completed on NVP for tenant "
"%(tenant_id)s: (%(id)s)"), port_data)
+ # remove since it will be added in extend based on policy
+ del port_data[ext_qos.QUEUE]
self._extend_port_port_security_dict(context, port_data)
self._extend_port_dict_security_group(context, port_data)
+ self._extend_port_qos_queue(context, port_data)
return port_data
def update_port(self, context, id, port):
if psec.PORTSECURITY in port['port']:
self._update_port_security_binding(
context, id, ret_port[psec.PORTSECURITY])
+
+ ret_port[ext_qos.QUEUE] = self._check_for_queue_and_create(
+ context, ret_port)
+ self._delete_port_queue_mapping(context, ret_port['id'])
+ self._process_port_queue_mapping(context, ret_port)
self._extend_port_port_security_dict(context, ret_port)
self._extend_port_dict_security_group(context, ret_port)
LOG.debug(_("Update port request: %s"), port)
ret_port['mac_address'],
ret_port['fixed_ips'],
ret_port[psec.PORTSECURITY],
- ret_port[ext_sg.SECURITYGROUPS])
+ ret_port[ext_sg.SECURITYGROUPS],
+ ret_port[ext_qos.QUEUE])
+ # remove since it will be added in extend based on policy
+ del ret_port[ext_qos.QUEUE]
+ self._extend_port_qos_queue(context, ret_port)
# Update the port status from nvp. If we fail here hide it since
# the port was successfully updated but we were not able to retrieve
# the status.
port_delete_func(context, quantum_db_port)
self.disassociate_floatingips(context, id)
with context.session.begin(subtransactions=True):
+ queue = self._get_port_queue_bindings(context, {'port_id': [id]})
if (cfg.CONF.metadata_dhcp_host_route and
quantum_db_port.device_owner == constants.DEVICE_OWNER_DHCP):
self._ensure_metadata_host_route(
context, quantum_db_port.fixed_ips[0], is_delete=True)
super(NvpPluginV2, self).delete_port(context, id)
+ # Delete qos queue if possible
+ if queue:
+ self.delete_qos_queue(context, queue[0]['queue_id'], False)
def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
id, fields)
self._extend_port_port_security_dict(context, quantum_db_port)
self._extend_port_dict_security_group(context, quantum_db_port)
+ self._extend_port_qos_queue(context, quantum_db_port)
if self._network_is_external(context,
quantum_db_port['network_id']):
self.default_cluster, sgid, current_rules)
return super(NvpPluginV2, self).delete_security_group_rule(context,
sgrid)
+
+ def create_qos_queue(self, context, qos_queue, check_policy=True):
+ if check_policy:
+ self._enforce_set_auth(context, qos_queue,
+ ext_qos.qos_queue_create)
+ q = qos_queue.get('qos_queue')
+ self._validate_qos_queue(context, q)
+ q['id'] = nvplib.create_lqueue(self.default_cluster,
+ self._nvp_lqueue(q))
+ return super(NvpPluginV2, self).create_qos_queue(context, qos_queue)
+
+ def delete_qos_queue(self, context, id, raise_in_use=True):
+ filters = {'queue_id': [id]}
+ queues = self._get_port_queue_bindings(context, filters)
+ if queues:
+ if raise_in_use:
+ raise ext_qos.QueueInUseByPort()
+ else:
+ return
+ nvplib.delete_lqueue(self.default_cluster, id)
+ return super(NvpPluginV2, self).delete_qos_queue(context, id)
+
+ def get_qos_queue(self, context, id, fields=None):
+ if not self._check_view_auth(context, {'qos_queue': None},
+ ext_qos.qos_queue_get):
+ # don't want the user to find out that they guessed the right id
+ # so we raise not found if the policy.json file doesn't allow them
+ raise ext_qos.QueueNotFound(id=id)
+
+ return super(NvpPluginV2, self).get_qos_queue(context, id, fields)
+
+ def get_qos_queues(self, context, filters=None, fields=None):
+ if not self._check_view_auth(context, {'qos_queue': []},
+ ext_qos.qos_queue_list):
+ return []
+ return super(NvpPluginV2, self).get_qos_queues(context, filters,
+ fields)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Nicira, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Aaron Rosen, Nicira Networks, Inc.
+
+
+from abc import abstractmethod
+
+from quantum.api.v2 import attributes as attr
+from quantum.api.v2 import base
+from quantum.api import extensions
+from quantum.common import exceptions as qexception
+from quantum import manager
+
+
+# For policy.json/Auth
+qos_queue_create = "create_qos_queue"
+qos_queue_delete = "delete_qos_queue"
+qos_queue_get = "get_qos_queue"
+qos_queue_list = "get_qos_queues"
+
+
+class DefaultQueueCreateNotAdmin(qexception.InUse):
+ message = _("Need to be admin in order to create queue called default")
+
+
+class DefaultQueueAlreadyExists(qexception.InUse):
+ message = _("Default queue already exists.")
+
+
+class QueueInvalidDscp(qexception.InvalidInput):
+ message = _("Invalid value for dscp %(data)s must be integer.")
+
+
+class QueueMinGreaterMax(qexception.InvalidInput):
+ message = _("Invalid bandwidth rate, min greater than max.")
+
+
+class QueueInvalidBandwidth(qexception.InvalidInput):
+ message = _("Invalid bandwidth rate, %(data)s must be a non negative"
+ " integer.")
+
+
+class MissingDSCPForTrusted(qexception.InvalidInput):
+ message = _("No DSCP field needed when QoS workload marked trusted")
+
+
+class QueueNotFound(qexception.NotFound):
+ message = _("Queue %(id)s does not exist")
+
+
+class QueueInUseByPort(qexception.InUse):
+ message = _("Unable to delete queue attached to port.")
+
+
+class QueuePortBindingNotFound(qexception.NotFound):
+ message = _("Port is not associated with lqueue")
+
+
+def convert_to_unsigned_int_or_none(val):
+ if val is None:
+ return
+ try:
+ val = int(val)
+ if val < 0:
+ raise ValueError
+ except (ValueError, TypeError):
+ msg = _("'%s' must be a non negative integer.") % val
+ raise qexception.InvalidInput(error_message=msg)
+ return val
+
+# Attribute Map
+RESOURCE_ATTRIBUTE_MAP = {
+ 'qos_queues': {
+ 'id': {'allow_post': False, 'allow_put': False,
+ 'is_visible': True},
+ 'default': {'allow_post': True, 'allow_put': False,
+ 'convert_to': attr.convert_to_boolean,
+ 'is_visible': True, 'default': False},
+ 'name': {'allow_post': True, 'allow_put': False,
+ 'validate': {'type:string': None},
+ 'is_visible': True, 'default': ''},
+ 'min': {'allow_post': True, 'allow_put': False,
+ 'is_visible': True, 'default': '0',
+ 'convert_to': convert_to_unsigned_int_or_none},
+ 'max': {'allow_post': True, 'allow_put': False,
+ 'is_visible': True, 'default': None,
+ 'convert_to': convert_to_unsigned_int_or_none},
+ 'qos_marking': {'allow_post': True, 'allow_put': False,
+ 'validate': {'type:values': ['untrusted', 'trusted']},
+ 'default': 'untrusted', 'is_visible': True},
+ 'dscp': {'allow_post': True, 'allow_put': False,
+ 'is_visible': True, 'default': '0',
+ 'convert_to': convert_to_unsigned_int_or_none},
+ 'tenant_id': {'allow_post': True, 'allow_put': False,
+ 'required_by_policy': True,
+ 'validate': {'type:string': None},
+ 'is_visible': True},
+ },
+}
+
+
+QUEUE = 'queue_id'
+RXTX_FACTOR = 'rxtx_factor'
+EXTENDED_ATTRIBUTES_2_0 = {
+ 'ports': {
+ RXTX_FACTOR: {'allow_post': True,
+ 'allow_put': False,
+ 'is_visible': False,
+ 'default': 1,
+ 'convert_to': convert_to_unsigned_int_or_none},
+
+ QUEUE: {'allow_post': False,
+ 'allow_put': False,
+ 'is_visible': True,
+ 'default': False}},
+ 'networks': {QUEUE: {'allow_post': True,
+ 'allow_put': True,
+ 'is_visible': True,
+ 'default': False}}
+
+}
+
+
+class Nvp_qos(object):
+ """Port Queue extension"""
+
+ @classmethod
+ def get_name(cls):
+ return "nvp-qos"
+
+ @classmethod
+ def get_alias(cls):
+ return "nvp-qos"
+
+ @classmethod
+ def get_description(cls):
+ return "NVP QoS extension."
+
+ @classmethod
+ def get_namespace(cls):
+ return "http://docs.openstack.org/ext/nvp-qos/api/v2.0"
+
+ @classmethod
+ def get_updated(cls):
+ return "2012-10-05T10:00:00-00:00"
+
+ @classmethod
+ def get_resources(cls):
+ """ Returns Ext Resources """
+ exts = []
+ plugin = manager.QuantumManager.get_plugin()
+ resource_name = 'qos_queue'
+ collection_name = resource_name.replace('_', '-') + "s"
+ params = RESOURCE_ATTRIBUTE_MAP.get(resource_name + "s", dict())
+ controller = base.create_resource(collection_name,
+ resource_name,
+ plugin, params, allow_bulk=False)
+
+ ex = extensions.ResourceExtension(collection_name,
+ controller)
+ exts.append(ex)
+
+ return exts
+
+ def get_extended_resources(self, version):
+ if version == "2.0":
+ return EXTENDED_ATTRIBUTES_2_0
+ else:
+ return {}
+
+
+class QueuePluginBase(object):
+ @abstractmethod
+ def create_qos_queue(self, context, queue):
+ pass
+
+ @abstractmethod
+ def delete_qos_queue(self, context, id):
+ pass
+
+ @abstractmethod
+ def get_qos_queue(self, context, id, fields=None):
+ pass
+
+ @abstractmethod
+ def get_qos_queues(self, context, filters=None, fields=None):
+ pass
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 Nicira Networks, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Aaron Rosen, Nicira, Inc
+
+import sqlalchemy as sa
+from sqlalchemy.orm import exc
+
+from quantum.api.v2 import attributes as attr
+from quantum.db import model_base
+from quantum.db import models_v2
+from quantum.openstack.common import uuidutils
+from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
+ as ext_qos)
+
+
+class QoSQueue(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
+ name = sa.Column(sa.String(255))
+ default = sa.Column(sa.Boolean, default=False)
+ min = sa.Column(sa.Integer, nullable=False)
+ max = sa.Column(sa.Integer, nullable=True)
+ qos_marking = sa.Column(sa.Enum('untrusted', 'trusted',
+ name='qosqueues_qos_marking'))
+ dscp = sa.Column(sa.Integer)
+
+
+class PortQueueMapping(model_base.BASEV2):
+ port_id = sa.Column(sa.String(36),
+ sa.ForeignKey("ports.id", ondelete="CASCADE"),
+ primary_key=True)
+
+ queue_id = sa.Column(sa.String(36), sa.ForeignKey("qosqueues.id"),
+ primary_key=True)
+
+
+class NetworkQueueMapping(model_base.BASEV2):
+ network_id = sa.Column(sa.String(36),
+ sa.ForeignKey("networks.id", ondelete="CASCADE"),
+ primary_key=True)
+
+ queue_id = sa.Column(sa.String(36), sa.ForeignKey("qosqueues.id",
+ ondelete="CASCADE"))
+
+
+class NVPQoSDbMixin(ext_qos.QueuePluginBase):
+ """Mixin class to add queues."""
+
+ def create_qos_queue(self, context, qos_queue):
+ q = qos_queue['qos_queue']
+ with context.session.begin(subtransactions=True):
+ qos_queue = QoSQueue(id=q.get('id', uuidutils.generate_uuid()),
+ name=q.get('name'),
+ tenant_id=q['tenant_id'],
+ default=q.get('default'),
+ min=q.get('min'),
+ max=q.get('max'),
+ qos_marking=q.get('qos_marking'),
+ dscp=q.get('dscp'))
+ context.session.add(qos_queue)
+ return self._make_qos_queue_dict(qos_queue)
+
+ def get_qos_queue(self, context, id, fields=None):
+ return self._make_qos_queue_dict(
+ self._get_qos_queue(context, id), fields)
+
+ def _get_qos_queue(self, context, id):
+ try:
+ return self._get_by_id(context, QoSQueue, id)
+ except exc.NoResultFound:
+ raise ext_qos.QueueNotFound(id=id)
+
+ def get_qos_queues(self, context, filters=None, fields=None):
+ return self._get_collection(context, QoSQueue,
+ self._make_qos_queue_dict,
+ filters=filters, fields=fields)
+
+ def delete_qos_queue(self, context, id):
+ qos_queue = self._get_qos_queue(context, id)
+ with context.session.begin(subtransactions=True):
+ context.session.delete(qos_queue)
+
+ def _process_port_queue_mapping(self, context, p):
+ if not p.get(ext_qos.QUEUE):
+ return
+ with context.session.begin(subtransactions=True):
+ db = PortQueueMapping(port_id=p['id'],
+ queue_id=p.get(ext_qos.QUEUE))
+ context.session.add(db)
+
+ def _get_port_queue_bindings(self, context, filters=None, fields=None):
+ return self._get_collection(context, PortQueueMapping,
+ self._make_port_queue_binding_dict,
+ filters=filters, fields=fields)
+
+ def _delete_port_queue_mapping(self, context, port_id):
+ query = self._model_query(context, PortQueueMapping)
+ try:
+ binding = query.filter(PortQueueMapping.port_id == port_id).one()
+ except exc.NoResultFound:
+ # return since this can happen if we are updating a port that
+ # did not already have a queue on it. There is no need to check
+ # if there is one before deleting if we return here.
+ return
+ with context.session.begin(subtransactions=True):
+ context.session.delete(binding)
+
+ def _process_network_queue_mapping(self, context, network):
+ if not network.get(ext_qos.QUEUE):
+ return
+ with context.session.begin(subtransactions=True):
+ db = NetworkQueueMapping(network_id=network['id'],
+ queue_id=network.get(ext_qos.QUEUE))
+ context.session.add(db)
+
+ def _get_network_queue_bindings(self, context, filters=None, fields=None):
+ return self._get_collection(context, NetworkQueueMapping,
+ self._make_network_queue_binding_dict,
+ filters=filters, fields=fields)
+
+ def _delete_network_queue_mapping(self, context, network_id):
+ query = self._model_query(context, NetworkQueueMapping)
+ try:
+ with context.session.begin(subtransactions=True):
+ binding = query.filter_by(network_id=network_id).first()
+ if binding:
+ context.session.delete(binding)
+ except exc.NoResultFound:
+ # return since this can happen if we are updating a port that
+ # did not already have a queue on it. There is no need to check
+ # if there is one before deleting if we return here.
+ return
+
+ def _extend_port_qos_queue(self, context, port):
+ if self._check_view_auth(context, {'qos_queue': None},
+ ext_qos.qos_queue_get):
+ filters = {'port_id': [port['id']]}
+ fields = ['queue_id']
+ port[ext_qos.QUEUE] = None
+ queue_id = self._get_port_queue_bindings(
+ context, filters, fields)
+ if queue_id:
+ port[ext_qos.QUEUE] = queue_id[0]['queue_id']
+ return port
+
+ def _extend_network_qos_queue(self, context, network):
+ if self._check_view_auth(context, {'qos_queue': None},
+ ext_qos.qos_queue_get):
+ filters = {'network_id': [network['id']]}
+ fields = ['queue_id']
+ network[ext_qos.QUEUE] = None
+ queue_id = self._get_network_queue_bindings(
+ context, filters, fields)
+ if queue_id:
+ network[ext_qos.QUEUE] = queue_id[0]['queue_id']
+ return network
+
+ def _make_qos_queue_dict(self, queue, fields=None):
+ res = {'id': queue['id'],
+ 'name': queue.get('name'),
+ 'default': queue.get('default'),
+ 'tenant_id': queue['tenant_id'],
+ 'min': queue.get('min'),
+ 'max': queue.get('max'),
+ 'qos_marking': queue.get('qos_marking'),
+ 'dscp': queue.get('dscp')}
+ return self._fields(res, fields)
+
+ def _make_port_queue_binding_dict(self, queue, fields=None):
+ res = {'port_id': queue['port_id'],
+ 'queue_id': queue['queue_id']}
+ return self._fields(res, fields)
+
+ def _make_network_queue_binding_dict(self, queue, fields=None):
+ res = {'network_id': queue['network_id'],
+ 'queue_id': queue['queue_id']}
+ return self._fields(res, fields)
+
+ def _check_for_queue_and_create(self, context, port):
+ """This function determines if a port should be associated with a
+ queue. It works by first querying NetworkQueueMapping to determine
+ if the network is associated with a queue. If so, then it queries
+ NetworkQueueMapping for all the networks that are associated with
+ this queue. Next, it queries against all the ports on these networks
+ with the port device_id. Finally it queries PortQueueMapping. If that
+ query returns a queue_id that is returned. Otherwise a queue is
+ created that is the size of the queue associated with the network and
+ that queue_id is returned.
+
+ If the network is not associated with a queue we then query to see
+ if there is a default queue in the system. If so, a copy of that is
+ created and the queue_id is returned.
+
+ Otherwise None is returned. None is also returned if the port does not
+ have a device_id or if the device_owner is network:
+ """
+
+ queue_to_create = None
+ # If there is no device_id don't create a queue. The queue will be
+ # created on update port when the device_id is present. Also don't
+ # apply QoS to network ports.
+ if (not port.get('device_id') or
+ port['device_owner'].startswith('network:')):
+ return
+
+ # Check if there is a queue assocated with the network
+ filters = {'network_id': [port['network_id']]}
+ network_queue_id = self._get_network_queue_bindings(
+ context, filters, ['queue_id'])
+
+ if network_queue_id:
+ # get networks that queue is assocated with
+ filters = {'queue_id': [network_queue_id[0]['queue_id']]}
+ networks_with_same_queue = self._get_network_queue_bindings(
+ context, filters)
+
+ # get the ports on these networks with the same_queue and device_id
+ filters = {'device_id': [port.get('device_id')],
+ 'network_id': [network['network_id'] for
+ network in networks_with_same_queue]}
+ query = self._model_query(context, models_v2.Port)
+ ports = self._apply_filters_to_query(query, models_v2.Port,
+ filters).all()
+
+ if ports:
+ # shared queue already exists find the queue id
+ filters = {'port_id': [p['id'] for p in ports]}
+ queues = self._get_port_queue_bindings(context, filters,
+ ['queue_id'])
+ if queues:
+ return queues[0]['queue_id']
+
+ # get the size of the queue we want to create
+ queue_to_create = self._get_qos_queue(
+ context, network_queue_id[0]['queue_id'])
+
+ else:
+ # check for default queue
+ filters = {'default': [True]}
+ # context is elevated since default queue is owned by admin
+ queue_to_create = self.get_qos_queues(context.elevated(), filters)
+ if not queue_to_create:
+ return
+ queue_to_create = queue_to_create[0]
+
+ # create the queue
+ tenant_id = self._get_tenant_id_for_create(context, port)
+ if port.get(ext_qos.RXTX_FACTOR) and queue_to_create.get('max'):
+ queue_to_create['max'] *= int(port[ext_qos.RXTX_FACTOR])
+ queue = {'qos_queue': {'name': queue_to_create.get('name'),
+ 'min': queue_to_create.get('min'),
+ 'max': queue_to_create.get('max'),
+ 'dscp': queue_to_create.get('dscp'),
+ 'qos_marking':
+ queue_to_create.get('qos_marking'),
+ 'tenant_id': tenant_id}}
+ return self.create_qos_queue(context, queue, False)['id']
+
+ def _validate_qos_queue(self, context, qos_queue):
+ if qos_queue.get('default'):
+ if context.is_admin:
+ if self.get_qos_queues(context, filters={'default': [True]}):
+ raise ext_qos.DefaultQueueAlreadyExists()
+ else:
+ raise ext_qos.DefaultQueueCreateNotAdmin()
+ if (qos_queue.get('qos_marking') == 'trusted' and
+ not qos_queue.get('dscp')):
+ raise ext_qos.MissingDSCPForTrusted()
+ max = qos_queue.get('max')
+ min = qos_queue.get('min')
+ # Max can be None
+ if max and min > max:
+ raise ext_qos.QueueMinGreaterMax()
+
+ def _nvp_lqueue(self, queue):
+ """Convert fields to nvp fields."""
+ nvp_queue = {}
+ params = {'name': 'display_name',
+ 'qos_marking': 'qos_marking',
+ 'min': 'min_bandwidth_rate',
+ 'max': 'max_bandwidth_rate',
+ 'dscp': 'dscp'}
+ nvp_queue = dict(
+ (nvp_name, queue.get(api_name))
+ for api_name, nvp_name in params.iteritems()
+ if attr.is_attr_set(queue.get(api_name))
+ )
+ return nvp_queue
LROUTER_RESOURCE = "lrouter"
LROUTERPORT_RESOURCE = "lport-%s" % LROUTER_RESOURCE
LROUTERNAT_RESOURCE = "nat-lrouter"
+LQUEUE_RESOURCE = "lqueue"
+# Current quantum version
+QUANTUM_VERSION = "2013.1"
# Constants for NAT rules
MATCH_KEYS = ["destination_ip_addresses", "destination_port_max",
def _configure_extensions(lport_obj, mac_address, fixed_ips,
- port_security_enabled, security_profiles):
+ port_security_enabled, security_profiles, queue_id):
lport_obj['allowed_address_pairs'] = []
if port_security_enabled:
for fixed_ip in fixed_ips:
{"mac_address": mac_address,
"ip_address": "0.0.0.0"})
lport_obj['security_profiles'] = list(security_profiles or [])
+ lport_obj['queue_uuid'] = queue_id
def update_port(cluster, lswitch_uuid, lport_uuid, quantum_port_id, tenant_id,
display_name, device_id, admin_status_enabled,
mac_address=None, fixed_ips=None, port_security_enabled=None,
- security_profiles=None):
+ security_profiles=None, queue_id=None):
# device_id can be longer than 40 so we rehash it
hashed_device_id = hashlib.sha1(device_id).hexdigest()
dict(scope='vm_id', tag=hashed_device_id)])
_configure_extensions(lport_obj, mac_address, fixed_ips,
- port_security_enabled, security_profiles)
+ port_security_enabled, security_profiles,
+ queue_id)
path = "/ws.v1/lswitch/" + lswitch_uuid + "/lport/" + lport_uuid
try:
def create_lport(cluster, lswitch_uuid, tenant_id, quantum_port_id,
display_name, device_id, admin_status_enabled,
mac_address=None, fixed_ips=None, port_security_enabled=None,
- security_profiles=None):
+ security_profiles=None, queue_id=None):
""" Creates a logical port on the assigned logical switch """
# device_id can be longer than 40 so we rehash it
hashed_device_id = hashlib.sha1(device_id).hexdigest()
)
_configure_extensions(lport_obj, mac_address, fixed_ips,
- port_security_enabled, security_profiles)
+ port_security_enabled, security_profiles,
+ queue_id)
path = _build_uri_path(LSWITCHPORT_RESOURCE,
parent_resource_id=lswitch_uuid)
'create_lrouter_snat_rule': {2: create_lrouter_snat_rule_v2,
3: create_lrouter_snat_rule_v3}
}
+
+
+# -----------------------------------------------------------------------------
+# QOS API Calls
+# -----------------------------------------------------------------------------
+def create_lqueue(cluster, lqueue):
+ uri = _build_uri_path(LQUEUE_RESOURCE)
+ lqueue['tags'] = [{'tag': QUANTUM_VERSION, 'scope': 'quantum'}]
+ try:
+ resp_obj = do_single_request(HTTP_POST, uri, json.dumps(lqueue),
+ cluster=cluster)
+ except NvpApiClient.NvpApiException:
+ LOG.exception(_("Failed to create logical queue"))
+ raise exception.QuantumException()
+ return json.loads(resp_obj)['uuid']
+
+
+def delete_lqueue(cluster, id):
+ try:
+ do_single_request(HTTP_DELETE,
+ _build_uri_path(LQUEUE_RESOURCE,
+ resource_id=id),
+ cluster=cluster)
+ except Exception:
+ LOG.exception(_("Failed to delete logical queue"))
+ raise exception.QuantumException()
--- /dev/null
+{
+ "display_name": "%(display_name)s",
+ "uuid": "%(uuid)s",
+ "type": "LogicalSwitchConfig",
+ "_schema": "/ws.v1/schema/LogicalQueueConfig",
+ "dscp": "%(dscp)s",
+ "max_bandwidth_rate": "%(max_bandwidth_rate)s",
+ "min_bandwidth_rate": "%(min_bandwidth_rate)s",
+ "qos_marking": "%(qos_marking)s",
+ "_href": "/ws.v1/lqueue/%(uuid)s"
+}
LPORT_RESOURCE = 'lport'
LROUTER_RESOURCE = 'lrouter'
NAT_RESOURCE = 'nat'
+ LQUEUE_RESOURCE = 'lqueue'
SECPROF_RESOURCE = 'securityprofile'
LSWITCH_STATUS = 'lswitchstatus'
LROUTER_STATUS = 'lrouterstatus'
LROUTER_LPORT_STATUS = 'lrouter_lportstatus'
LROUTER_LPORT_ATT = 'lrouter_lportattachment'
- RESOURCES = [LSWITCH_RESOURCE, LROUTER_RESOURCE,
+ RESOURCES = [LSWITCH_RESOURCE, LROUTER_RESOURCE, LQUEUE_RESOURCE,
LPORT_RESOURCE, NAT_RESOURCE, SECPROF_RESOURCE]
FAKE_GET_RESPONSES = {
LSWITCH_LPORT_RESOURCE: "fake_post_lswitch_lport.json",
LROUTER_LPORT_RESOURCE: "fake_post_lrouter_lport.json",
LROUTER_NAT_RESOURCE: "fake_post_lrouter_nat.json",
- SECPROF_RESOURCE: "fake_post_security_profile.json"
+ SECPROF_RESOURCE: "fake_post_security_profile.json",
+ LQUEUE_RESOURCE: "fake_post_lqueue.json"
}
FAKE_PUT_RESPONSES = {
LROUTER_NAT_RESOURCE: "fake_post_lrouter_nat.json",
LSWITCH_LPORT_ATT: "fake_put_lswitch_lport_att.json",
LROUTER_LPORT_ATT: "fake_put_lrouter_lport_att.json",
- SECPROF_RESOURCE: "fake_post_security_profile.json"
+ SECPROF_RESOURCE: "fake_post_security_profile.json",
+ LQUEUE_RESOURCE: "fake_post_lqueue.json"
}
MANAGED_RELATIONS = {
_fake_lswitch_lportstatus_dict = {}
_fake_lrouter_lportstatus_dict = {}
_fake_securityprofile_dict = {}
+ _fake_lqueue_dict = {}
def __init__(self, fake_files_path):
self.fake_files_path = fake_files_path
'gateway_ip_address', '0.0.0.0')
return fake_lrouter
+ def _add_lqueue(self, body):
+ fake_lqueue = json.loads(body)
+ fake_lqueue['uuid'] = uuidutils.generate_uuid()
+ self._fake_lqueue_dict[fake_lqueue['uuid']] = fake_lqueue
+ return fake_lqueue
+
def _add_lswitch_lport(self, body, ls_uuid):
fake_lport = json.loads(body)
new_uuid = uuidutils.generate_uuid()
/ws.v1/lrouter/zzz/status
/ws.v1/lrouter/zzz/lport/www
/ws.v1/lrouter/zzz/lport/www/status
+ /ws.v1/lqueue/xxx
"""
# The first element will always be 'ws.v1' - so we just discard it
uri_split = path.split('/')[1:]
self._fake_lrouter_lport_dict.clear()
self._fake_lswitch_lportstatus_dict.clear()
self._fake_lrouter_lportstatus_dict.clear()
+ self._fake_lqueue_dict.clear()
from quantum.extensions import securitygroup as secgrp
from quantum import manager
from quantum.openstack.common import cfg
+from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
+ as ext_qos)
from quantum.plugins.nicira.nicira_nvp_plugin import nvplib
from quantum.tests.unit.nicira import fake_nvpapiclient
+from quantum.tests.unit import test_extensions
import quantum.tests.unit.test_db_plugin as test_plugin
import quantum.tests.unit.test_extension_portsecurity as psec
import quantum.tests.unit.test_extension_security_group as ext_sg
LOG = logging.getLogger(__name__)
NICIRA_PKG_PATH = 'quantum.plugins.nicira.nicira_nvp_plugin'
+NICIRA_EXT_PATH = "../../plugins/nicira/nicira_nvp_plugin/extensions"
class NiciraPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
def _test_create_bridge_network(self, vlan_id=None):
net_type = vlan_id and 'vlan' or 'flat'
name = 'bridge_net'
- keys = [('subnets', []), ('name', name), ('admin_state_up', True),
- ('status', 'ACTIVE'), ('shared', False),
- (pnet.NETWORK_TYPE, net_type),
- (pnet.PHYSICAL_NETWORK, 'tzuuid'),
- (pnet.SEGMENTATION_ID, vlan_id)]
+ expected = [('subnets', []), ('name', name), ('admin_state_up', True),
+ ('status', 'ACTIVE'), ('shared', False),
+ (pnet.NETWORK_TYPE, net_type),
+ (pnet.PHYSICAL_NETWORK, 'tzuuid'),
+ (pnet.SEGMENTATION_ID, vlan_id)]
providernet_args = {pnet.NETWORK_TYPE: net_type,
pnet.PHYSICAL_NETWORK: 'tzuuid'}
if vlan_id:
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID)) as net:
- for k, v in keys:
- self.assertEquals(net['network'][k], v)
+ for k, v in expected:
+ self.assertEqual(net['network'][k], v)
def test_create_bridge_network(self):
self._test_create_bridge_network()
def test_create_bridge_vlan_network_outofrange_returns_400(self):
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_bridge_network(vlan_id=5000)
- self.assertEquals(ctx_manager.exception.code, 400)
+ self.assertEqual(ctx_manager.exception.code, 400)
def test_list_networks_filter_by_id(self):
# We add this unit test to cover some logic specific to the
self._test_floatingip_with_assoc_fails(
'quantum.plugins.nicira.nicira_nvp_plugin.'
'QuantumPlugin.NvpPluginV2')
+
+
+class NvpQoSTestExtensionManager(object):
+
+ def get_resources(self):
+ return ext_qos.Nvp_qos.get_resources()
+
+ def get_actions(self):
+ return []
+
+ def get_request_extensions(self):
+ return []
+
+
+class TestNiciraQoSQueue(NiciraPluginV2TestCase):
+
+ def setUp(self, plugin=None):
+ ext_path = os.path.join(os.path.dirname(os.path.dirname(__file__)),
+ NICIRA_EXT_PATH)
+ cfg.CONF.set_override('api_extensions_path', ext_path)
+ super(TestNiciraQoSQueue, self).setUp()
+ ext_mgr = NvpQoSTestExtensionManager()
+ self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+
+ def _create_qos_queue(self, fmt, body, **kwargs):
+ qos_queue = self.new_create_request('qos-queues', body)
+ if (kwargs.get('set_context') and 'tenant_id' in kwargs):
+ # create a specific auth context for this request
+ qos_queue.environ['quantum.context'] = context.Context(
+ '', kwargs['tenant_id'])
+
+ return qos_queue.get_response(self.ext_api)
+
+ @contextlib.contextmanager
+ def qos_queue(self, name='foo', min='0', max='10',
+ qos_marking=None, dscp='0', default=None, no_delete=False):
+
+ body = {'qos_queue': {'tenant_id': 'tenant',
+ 'name': name,
+ 'min': min,
+ 'max': max}}
+
+ if qos_marking:
+ body['qos_queue']['qos_marking'] = qos_marking
+ if dscp:
+ body['qos_queue']['dscp'] = dscp
+ if default:
+ body['qos_queue']['default'] = default
+
+ res = self._create_qos_queue('json', body)
+ qos_queue = self.deserialize('json', res)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ try:
+ yield qos_queue
+ finally:
+ if not no_delete:
+ self._delete('qos-queues',
+ qos_queue['qos_queue']['id'])
+
+ def test_create_qos_queue(self):
+ with self.qos_queue(name='fake_lqueue', min=34, max=44,
+ qos_marking='untrusted', default=False) as q:
+ self.assertEqual(q['qos_queue']['name'], 'fake_lqueue')
+ self.assertEqual(q['qos_queue']['min'], 34)
+ self.assertEqual(q['qos_queue']['max'], 44)
+ self.assertEqual(q['qos_queue']['qos_marking'], 'untrusted')
+ self.assertFalse(q['qos_queue']['default'])
+
+ def test_create_qos_queue_default(self):
+ with self.qos_queue(default=True) as q:
+ self.assertTrue(q['qos_queue']['default'])
+
+ def test_create_qos_queue_two_default_queues_fail(self):
+ with self.qos_queue(default=True):
+ body = {'qos_queue': {'tenant_id': 'tenant',
+ 'name': 'second_default_queue',
+ 'default': True}}
+ res = self._create_qos_queue('json', body)
+ self.assertEqual(res.status_int, 409)
+
+ def test_create_port_with_queue(self):
+ with self.qos_queue(default=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ q1['qos_queue']['id'])
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ with self.port(device_id=device_id, do_delete=False) as p:
+ self.assertEqual(len(p['port'][ext_qos.QUEUE]), 36)
+
+ def test_create_shared_queue_networks(self):
+ with self.qos_queue(default=True, no_delete=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ q1['qos_queue']['id'])
+ res = self._create_network('json', 'net2', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net2 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ q1['qos_queue']['id'])
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ res = self._create_port('json', net1['network']['id'],
+ device_id=device_id)
+ port1 = self.deserialize('json', res)
+ res = self._create_port('json', net2['network']['id'],
+ device_id=device_id)
+ port2 = self.deserialize('json', res)
+ self.assertEqual(port1['port'][ext_qos.QUEUE],
+ port2['port'][ext_qos.QUEUE])
+
+ self._delete('ports', port1['port']['id'])
+ self._delete('ports', port2['port']['id'])
+
+ def test_remove_queue_in_use_fail(self):
+ with self.qos_queue(no_delete=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ res = self._create_port('json', net1['network']['id'],
+ device_id=device_id)
+ port = self.deserialize('json', res)
+ self._delete('qos-queues', port['port'][ext_qos.QUEUE], 409)
+
+ def test_update_network_new_queue(self):
+ with self.qos_queue() as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ with self.qos_queue() as new_q:
+ data = {'network': {ext_qos.QUEUE: new_q['qos_queue']['id']}}
+ req = self.new_update_request('networks', data,
+ net1['network']['id'])
+ res = req.get_response(self.api)
+ net1 = self.deserialize('json', res)
+ self.assertEqual(net1['network'][ext_qos.QUEUE],
+ new_q['qos_queue']['id'])
+
+ def test_update_port_adding_device_id(self):
+ with self.qos_queue(no_delete=True) as q1:
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ res = self._create_port('json', net1['network']['id'])
+ port = self.deserialize('json', res)
+ self.assertEqual(port['port'][ext_qos.QUEUE], None)
+
+ data = {'port': {'device_id': device_id}}
+ req = self.new_update_request('ports', data,
+ port['port']['id'])
+
+ res = req.get_response(self.api)
+ port = self.deserialize('json', res)
+ self.assertEqual(len(port['port'][ext_qos.QUEUE]), 36)
+
+ def test_get_port_with_qos_not_admin(self):
+ body = {'qos_queue': {'tenant_id': 'not_admin',
+ 'name': 'foo', 'min': 20, 'max': 20}}
+ res = self._create_qos_queue('json', body, tenant_id='not_admin')
+ q1 = self.deserialize('json', res)
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE, 'tenant_id',),
+ queue_id=q1['qos_queue']['id'],
+ tenant_id="not_admin")
+ net1 = self.deserialize('json', res)
+ self.assertEqual(len(net1['network'][ext_qos.QUEUE]), 36)
+ res = self._create_port('json', net1['network']['id'],
+ tenant_id='not_admin', set_context=True)
+
+ port = self.deserialize('json', res)
+ self.assertEqual(ext_qos.QUEUE not in port['port'], True)
+
+ def test_non_admin_cannot_create_queue(self):
+ body = {'qos_queue': {'tenant_id': 'not_admin',
+ 'name': 'foo', 'min': 20, 'max': 20}}
+ res = self._create_qos_queue('json', body, tenant_id='not_admin',
+ set_context=True)
+ self.assertEqual(res.status_int, 403)
+
+ def test_update_port_non_admin_does_not_show_queue_id(self):
+ body = {'qos_queue': {'tenant_id': 'not_admin',
+ 'name': 'foo', 'min': 20, 'max': 20}}
+ res = self._create_qos_queue('json', body, tenant_id='not_admin')
+ q1 = self.deserialize('json', res)
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ tenant_id='not_admin',
+ queue_id=q1['qos_queue']['id'])
+
+ net1 = self.deserialize('json', res)
+ res = self._create_port('json', net1['network']['id'],
+ tenant_id='not_admin', set_context=True)
+ port = self.deserialize('json', res)
+ device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
+ data = {'port': {'device_id': device_id}}
+ quantum_context = context.Context('', 'not_admin')
+ port = self._update('ports', port['port']['id'], data,
+ quantum_context=quantum_context)
+ self.assertEqual(ext_qos.QUEUE not in port['port'], True)
+
+ def test_rxtx_factor(self):
+ with self.qos_queue(max=10) as q1:
+
+ res = self._create_network('json', 'net1', True,
+ arg_list=(ext_qos.QUEUE,),
+ queue_id=q1['qos_queue']['id'])
+ net1 = self.deserialize('json', res)
+ res = self._create_port('json', net1['network']['id'],
+ arg_list=(ext_qos.RXTX_FACTOR,),
+ rxtx_factor=2, device_id='1')
+ port = self.deserialize('json', res)
+ req = self.new_show_request('qos-queues',
+ port['port'][ext_qos.QUEUE])
+ res = req.get_response(self.ext_api)
+ queue = self.deserialize('json', res)
+ self.assertEqual(queue['qos_queue']['max'], 20)