--- /dev/null
+# Copyright (c) 2015 Mellanox Technologies, Ltd
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import collections
+
+from oslo_utils import importutils
+import six
+
+from neutron.agent.l2 import agent_extension
+from neutron.api.rpc.callbacks import resources
+
+
+@six.add_metaclass(abc.ABCMeta)
+class QosAgentDriver(object):
+ """Define stable abstract interface for Qos Agent Driver.
+
+ Qos Agent driver defines the interface to be implemented by Agent
+ for applying Qos Rules on a port.
+ """
+
+ @abc.abstractmethod
+ def initialize(self):
+ """Perform Qos agent driver initialization.
+ """
+ pass
+
+ @abc.abstractmethod
+ def create(self, port, rules):
+ """Apply Qos rules on port for the first time.
+
+ :param port: port object.
+ :param rules: the list of rules to apply on port.
+ """
+ #TODO(Qos) we may want to provide default implementations of calling
+ #delete and then update
+ pass
+
+ @abc.abstractmethod
+ def update(self, port, rules):
+ """Apply Qos rules on port.
+
+ :param port: port object.
+ :param rules: the list of rules to be apply on port.
+ """
+ pass
+
+ @abc.abstractmethod
+ def delete(self, port, rules):
+ """Remove Qos rules from port.
+
+ :param port: port object.
+ :param rules: the list of rules to be removed from port.
+ """
+ pass
+
+
+class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
+ def initialize(self, resource_rpc):
+ """Perform Agent Extension initialization.
+
+ :param resource_rpc: the agent side rpc for getting
+ resource by type and id
+ """
+ super(QosAgentExtension, self).initialize(resource_rpc)
+ #TODO(QoS) - Load it from Config
+ qos_driver_cls = importutils.import_class(
+ 'neutron.plugins.ml2.drivers.openvswitch.agent.'
+ 'extension_drivers.qos_driver.QosOVSAgentDriver')
+ self.qos_driver = qos_driver_cls()
+ self.qos_driver.initialize()
+ self.qos_policy_ports = collections.defaultdict(dict)
+ self.known_ports = set()
+
+ def handle_port(self, context, port):
+ """Handle agent qos extension for port.
+
+ This method subscribes to qos_policy_id changes
+ with a callback and get all the qos_policy_ports and apply
+ them using the qos driver.
+ Updates and delete event should be handle by the registered
+ callback.
+ """
+ port_id = port['port_id']
+ qos_policy_id = port.get('qos_policy_id')
+ if qos_policy_id is None:
+ #TODO(QoS): we should also handle removing policy
+ return
+
+ #Note(moshele) check if we have seen this port
+ #and it has the same policy we do nothing.
+ if (port_id in self.known_ports and
+ port_id in self.qos_policy_ports[qos_policy_id]):
+ return
+
+ self.qos_policy_ports[qos_policy_id][port_id] = port
+ self.known_ports.add(port_id)
+ #TODO(QoS): handle updates when implemented
+ # we have two options:
+ # 1. to add new api for subscribe
+ # registry.subscribe(self._process_rules_updates,
+ # resources.QOS_RULES, qos_policy_id)
+ # 2. combine get_info rpc to also subscribe to the resource
+ qos_rules = self.resource_rpc.get_info(
+ context, resources.QOS_POLICY, qos_policy_id)
+ self._process_rules_updates(
+ port, resources.QOS_POLICY, qos_policy_id,
+ qos_rules, 'create')
+
+ def _process_rules_updates(
+ self, port, resource_type, resource_id,
+ qos_rules, action_type):
+ getattr(self.qos_driver, action_type)(port, qos_rules)
--- /dev/null
+# Copyright (c) 2015 Mellanox Technologies, Ltd
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+import mock
+from oslo_utils import uuidutils
+
+from neutron.agent.l2.extensions import qos_agent
+from neutron.api.rpc.callbacks import resources
+from neutron.tests import base
+
+# This is a minimalistic mock of rules to be passed/checked around
+# which should be exteneded as needed to make real rules
+TEST_GET_INFO_RULES = ['rule1', 'rule2']
+
+
+class QosAgentExtensionTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ super(QosAgentExtensionTestCase, self).setUp()
+ self.qos_agent = qos_agent.QosAgentExtension()
+ self.context = mock.Mock()
+
+ # Force our fake underlying QoS driver
+ #TODO(QoS): change config value when we tie this to a configuration
+ # entry.
+
+ self.import_patcher = mock.patch(
+ 'oslo_utils.importutils.import_class',
+ return_value=mock.Mock())
+ self.import_patcher.start()
+
+ self._create_fake_resource_rpc()
+ self.qos_agent.initialize(self.resource_rpc_mock)
+
+ def _create_fake_resource_rpc(self):
+ self.get_info_mock = mock.Mock(return_value=TEST_GET_INFO_RULES)
+ self.resource_rpc_mock = mock.Mock()
+ self.resource_rpc_mock.get_info = self.get_info_mock
+
+ def _create_test_port_dict(self):
+ return {'port_id': uuidutils.generate_uuid(),
+ 'qos_policy_id': uuidutils.generate_uuid()}
+
+ def test_handle_port_with_no_policy(self):
+ port = self._create_test_port_dict()
+ del port['qos_policy_id']
+ self.qos_agent._process_rules_updates = mock.Mock()
+ self.qos_agent.handle_port(self.context, port)
+ self.assertFalse(self.qos_agent._process_rules_updates.called)
+
+ def test_handle_unknown_port(self):
+ port = self._create_test_port_dict()
+ qos_policy_id = port['qos_policy_id']
+ port_id = port['port_id']
+ self.qos_agent.handle_port(self.context, port)
+ # we make sure the underlaying qos driver is called with the
+ # right parameters
+ self.qos_agent.qos_driver.create.assert_called_once_with(
+ port, TEST_GET_INFO_RULES)
+ self.assertEqual(port,
+ self.qos_agent.qos_policy_ports[qos_policy_id][port_id])
+ self.assertTrue(port_id in self.qos_agent.known_ports)
+
+ def test_handle_known_port(self):
+ port_obj1 = self._create_test_port_dict()
+ port_obj2 = copy.copy(port_obj1)
+ self.qos_agent.handle_port(self.context, port_obj1)
+ self.qos_agent.qos_driver.reset_mock()
+ self.qos_agent.handle_port(self.context, port_obj2)
+ self.assertFalse(self.qos_agent.qos_driver.create.called)
+
+ def test_handle_known_port_change_policy_id(self):
+ port = self._create_test_port_dict()
+ self.qos_agent.handle_port(self.context, port)
+ self.resource_rpc_mock.get_info.reset_mock()
+ port['qos_policy_id'] = uuidutils.generate_uuid()
+ self.qos_agent.handle_port(self.context, port)
+ self.get_info_mock.assert_called_once_with(
+ self.context, resources.QOS_POLICY,
+ port['qos_policy_id'])
+ #TODO(QoS): handle qos_driver.update call check when
+ # we do that