@six.add_metaclass(abc.ABCMeta)
class QosAgentDriver(object):
- """Define stable abstract interface for Qos Agent Driver.
+ """Define stable abstract interface for QoS Agent Driver.
- Qos Agent driver defines the interface to be implemented by Agent
- for applying Qos Rules on a port.
+ QoS Agent driver defines the interface to be implemented by Agent
+ for applying QoS Rules on a port.
"""
@abc.abstractmethod
def initialize(self):
- """Perform Qos agent driver initialization.
+ """Perform QoS agent driver initialization.
"""
pass
@abc.abstractmethod
- def create(self, port, rules):
- """Apply Qos rules on port for the first time.
+ def create(self, port, qos_policy):
+ """Apply QoS rules on port for the first time.
:param port: port object.
- :param rules: the list of rules to apply on port.
+ :param qos_policy: the QoS policy to be apply on port.
"""
- #TODO(Qos) we may want to provide default implementations of calling
+ #TODO(QoS) we may want to provide default implementations of calling
#delete and then update
pass
@abc.abstractmethod
- def update(self, port, rules):
- """Apply Qos rules on port.
+ def update(self, port, qos_policy):
+ """Apply QoS rules on port.
:param port: port object.
- :param rules: the list of rules to be apply on port.
+ :param qos_policy: the QoS policy to be apply on port.
"""
pass
@abc.abstractmethod
- def delete(self, port, rules):
- """Remove Qos rules from port.
+ def delete(self, port, qos_policy):
+ """Remove QoS rules from port.
:param port: port object.
- :param rules: the list of rules to be removed from port.
+ :param qos_policy: the QoS policy to be removed from port.
"""
pass
self.known_ports = set()
def handle_port(self, context, port):
- """Handle agent qos extension for port.
+ """Handle agent QoS extension for port.
This method subscribes to qos_policy_id changes
with a callback and get all the qos_policy_ports and apply
- them using the qos driver.
+ them using the QoS driver.
Updates and delete event should be handle by the registered
callback.
"""
from oslo_log import log as logging
from neutron.agent.common import ovs_lib
+from neutron.i18n import _LE, _LW
from neutron.agent.l2.extensions import qos_agent
-from neutron.services.qos import qos_consts
+from neutron.plugins.ml2.drivers.openvswitch.mech_driver import (
+ mech_openvswitch)
LOG = logging.getLogger(__name__)
class QosOVSAgentDriver(qos_agent.QosAgentDriver):
+ _SUPPORTED_RULES = (
+ mech_openvswitch.OpenvswitchMechanismDriver.supported_qos_rule_types)
+
def __init__(self):
super(QosOVSAgentDriver, self).__init__()
# TODO(QoS) check if we can get this configuration
# as constructor arguments
self.br_int_name = cfg.CONF.OVS.integration_bridge
self.br_int = None
- self.handlers = {}
def initialize(self):
- self.handlers[('update', qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)] = (
- self._update_bw_limit_rule)
- self.handlers[('create', qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)] = (
- self._update_bw_limit_rule)
- self.handlers[('delete', qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)] = (
- self._delete_bw_limit_rule)
-
self.br_int = ovs_lib.OVSBridge(self.br_int_name)
- def create(self, port, rules):
- self._handle_rules('create', port, rules)
-
- def update(self, port, rules):
- self._handle_rules('update', port, rules)
-
- def delete(self, port, rules):
- self._handle_rules('delete', port, rules)
-
- def _handle_rules(self, action, port, rules):
- for rule in rules:
- handler = self.handlers.get((action, rule.get('type')))
- if handler is not None:
- handler(port, rule)
-
- def _update_bw_limit_rule(self, port, rule):
- port_name = port.get('name')
- max_kbps = rule.get('max_kbps')
- max_burst_kbps = rule.get('max_burst_kbps')
+ def create(self, port, qos_policy):
+ self._handle_rules('create', port, qos_policy)
+
+ def update(self, port, qos_policy):
+ self._handle_rules('update', port, qos_policy)
+
+ def delete(self, port, qos_policy):
+ self._handle_rules('delete', port, qos_policy)
+
+ def _handle_rules(self, action, port, qos_policy):
+ for rule in qos_policy.rules:
+ if rule.rule_type in self._SUPPORTED_RULES:
+ handler_name = ("".join(("_", action, "_", rule.rule_type)))
+ try:
+ handler = getattr(self, handler_name)
+ handler(port, rule)
+ except AttributeError:
+ LOG.error(
+ _LE('Failed to locate a handler for %(rule_type) '
+ 'rules; skipping.'), handler_name)
+ else:
+ LOG.warning(_LW('Unsupported QoS rule type for %(rule_id)s: '
+ '%(rule_type)s; skipping'),
+ {'rule_id': rule.id, 'rule_type': rule.rule_type})
+
+ def _create_bandwidth_limit(self, port, rule):
+ self._update_bandwidth_limit(port, rule)
+
+ def _update_bandwidth_limit(self, port, rule):
+ port_name = port['vif_port'].port_name
+ max_kbps = rule.max_kbps
+ max_burst_kbps = rule.max_burst_kbps
current_max_kbps, current_max_burst = (
self.br_int.get_qos_bw_limit_for_port(port_name))
max_kbps,
max_burst_kbps)
- def _delete_bw_limit_rule(self, port, rule):
- port_name = port.get('name')
+ def _delete_bandwidth_limit(self, port, rule):
+ port_name = port['vif_port'].port_name
current_max_kbps, current_max_burst = (
self.br_int.get_qos_bw_limit_for_port(port_name))
if current_max_kbps is not None or current_max_burst is not None:
if 'port_id' in details:
LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': details})
+ details['vif_port'] = port
need_binding = self.treat_vif_port(port, details['port_id'],
details['network_id'],
details['network_type'],
self.setup_arp_spoofing_protection(self.int_br,
port, details)
if need_binding:
- details['vif_port'] = port
need_binding_devices.append(details)
self.agent_extensions_mgr.handle_port(self.context, details)
else:
# under the License.
import mock
+from oslo_utils import uuidutils
+from neutron import context
+from neutron.objects.qos import policy
+from neutron.objects.qos import rule
from neutron.plugins.ml2.drivers.openvswitch.agent.extension_drivers import (
qos_driver)
-from neutron.services.qos import qos_consts
from neutron.tests.unit.plugins.ml2.drivers.openvswitch.agent import (
ovs_test_base)
-class OVSQoSAgentDriverBwLimitRule(ovs_test_base.OVSAgentConfigTestBase):
+class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
def setUp(self):
- super(OVSQoSAgentDriverBwLimitRule, self).setUp()
+ super(QosOVSAgentDriverTestCase, self).setUp()
+ self.context = context.get_admin_context()
self.qos_driver = qos_driver.QosOVSAgentDriver()
self.qos_driver.initialize()
self.qos_driver.br_int = mock.Mock()
self.delete = self.qos_driver.br_int.del_qos_bw_limit_for_port
self.qos_driver.br_int.create_qos_bw_limit_for_port = mock.Mock()
self.create = self.qos_driver.br_int.create_qos_bw_limit_for_port
- self.rule = self._create_bw_limit_rule()
+ self.rule = self._create_bw_limit_rule_obj()
+ self.qos_policy = self._create_qos_policy_obj([self.rule])
self.port = self._create_fake_port()
- def _create_bw_limit_rule(self):
- return {'type': qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
- 'max_kbps': '200',
- 'max_burst_kbps': '2'}
+ def _create_bw_limit_rule_obj(self):
+ rule_obj = rule.QosBandwidthLimitRule()
+ rule_obj.id = uuidutils.generate_uuid()
+ rule_obj.max_kbps = 2
+ rule_obj.max_burst_kbps = 200
+ rule_obj.obj_reset_changes()
+ return rule_obj
+
+ def _create_qos_policy_obj(self, rules):
+ policy_dict = {'id': uuidutils.generate_uuid(),
+ 'tenant_id': uuidutils.generate_uuid(),
+ 'name': 'test',
+ 'description': 'test',
+ 'shared': False,
+ 'rules': rules}
+ policy_obj = policy.QosPolicy(self.context, **policy_dict)
+ policy_obj.obj_reset_changes()
+ return policy_obj
def _create_fake_port(self):
- return {'name': 'fakeport'}
+ self.port_name = 'fakeport'
+
+ class FakeVifPort(object):
+ port_name = self.port_name
+
+ return {'vif_port': FakeVifPort()}
def test_create_new_rule(self):
self.qos_driver.br_int.get_qos_bw_limit_for_port = mock.Mock(
return_value=(None, None))
- self.qos_driver.create(self.port, [self.rule])
+ self.qos_driver.create(self.port, self.qos_policy)
# Assert create is the last call
self.assertEqual(
'create_qos_bw_limit_for_port',
self.qos_driver.br_int.method_calls[-1][0])
self.assertEqual(0, self.delete.call_count)
self.create.assert_called_once_with(
- self.port['name'], self.rule['max_kbps'],
- self.rule['max_burst_kbps'])
+ self.port_name, self.rule.max_kbps,
+ self.rule.max_burst_kbps)
def test_create_existing_rules(self):
- self.qos_driver.create(self.port, [self.rule])
+ self.qos_driver.create(self.port, self.qos_policy)
self._assert_rule_create_updated()
def test_update_rules(self):
- self.qos_driver.update(self.port, [self.rule])
+ self.qos_driver.update(self.port, self.qos_policy)
self._assert_rule_create_updated()
def test_delete_rules(self):
- self.qos_driver.delete(self.port, [self.rule])
- self.delete.assert_called_once_with(self.port['name'])
-
- def test_unknown_rule_id(self):
- self.rule['type'] = 'unknown'
- self.qos_driver.create(self.port, [self.rule])
- self.assertEqual(0, self.create.call_count)
- self.assertEqual(0, self.delete.call_count)
+ self.qos_driver.delete(self.port, self.qos_policy)
+ self.delete.assert_called_once_with(self.port_name)
def _assert_rule_create_updated(self):
# Assert create is the last call
'create_qos_bw_limit_for_port',
self.qos_driver.br_int.method_calls[-1][0])
- self.delete.assert_called_once_with(self.port['name'])
+ self.delete.assert_called_once_with(self.port_name)
self.create.assert_called_once_with(
- self.port['name'], self.rule['max_kbps'],
- self.rule['max_burst_kbps'])
+ self.port_name, self.rule.max_kbps,
+ self.rule.max_burst_kbps)
self.assertTrue(self._mock_treat_devices_added_updated(
details, mock.Mock(), 'treat_vif_port'))
+ def test_treat_devices_added_updated_sends_vif_port_into_extension_manager(
+ self, *args):
+ details = mock.MagicMock()
+ details.__contains__.side_effect = lambda x: True
+ port = mock.MagicMock()
+
+ def fake_handle_port(context, port):
+ self.assertIn('vif_port', port)
+
+ with mock.patch.object(self.agent.plugin_rpc,
+ 'get_devices_details_list',
+ return_value=[details]),\
+ mock.patch.object(self.agent.agent_extensions_mgr,
+ 'handle_port', new=fake_handle_port),\
+ mock.patch.object(self.agent.int_br,
+ 'get_vifs_by_ids',
+ return_value={details['device']: port}),\
+ mock.patch.object(self.agent, 'treat_vif_port',
+ return_value=False):
+
+ self.agent.treat_devices_added_or_updated([{}], False)
+
def test_treat_devices_added_updated_skips_if_port_not_found(self):
dev_mock = mock.MagicMock()
dev_mock.__getitem__.return_value = 'the_skipped_one'