From: Mike Kolesnik Date: Tue, 30 Jun 2015 09:07:48 +0000 (+0300) Subject: Implement QoS plugin X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=4af0de954aabb51560d4c28f8ea246a53d214b20;p=openstack-build%2Fneutron-build.git Implement QoS plugin Initial implementation of the QoS service plugin that just implements CRUD for policy and rule. There are no tests yet. path_prefix is now provided as an attribute to the plugin base, since that's required by the COMMON_PREFIXES removal from master branch. Partially-implements: blueprint quantum-qos-api Change-Id: Icf821dec17f435d8e47e1047fb05225e7dd071f0 --- diff --git a/neutron/extensions/qos.py b/neutron/extensions/qos.py index 16ffa8f7a..23d59eb90 100644 --- a/neutron/extensions/qos.py +++ b/neutron/extensions/qos.py @@ -183,8 +183,9 @@ class Qos(extensions.ExtensionDescriptor): @six.add_metaclass(abc.ABCMeta) class QoSPluginBase(service_base.ServicePluginBase): + path_prefix = QOS_PREFIX + def get_plugin_description(self): - """returns string description of the plugin.""" return "QoS Service Plugin for ports and networks" def get_plugin_type(self): @@ -230,8 +231,8 @@ class QoSPluginBase(service_base.ServicePluginBase): pass @abc.abstractmethod - def update_policy_bandwidth_limit_rule(self, context, rule_id, - policy_id, bandwidth_limit_rule): + def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id, + bandwidth_limit_rule): pass @abc.abstractmethod diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index 2beb109ce..6ef13ae62 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -16,9 +16,14 @@ from neutron import manager from neutron.api.rpc.callbacks import registry as rpc_registry -from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.callbacks import resources as rpc_resources +from neutron.callbacks import events +from neutron.callbacks import registry +from neutron.callbacks import resources from neutron.extensions import qos from neutron.i18n import _LW +from neutron.objects.qos import policy as policy_object +from neutron.objects.qos import rule as rule_object from neutron.plugins.common import constants from oslo_log import log as logging @@ -103,78 +108,139 @@ class QoSPlugin(qos.QoSPluginBase): def __init__(self): super(QoSPlugin, self).__init__() self.register_resource_providers() - #self.register_port_callbacks() - #self.register_net_callbacks() - self._inline_test() - - def _inline_test(self): - #TODO(gampel) remove inline unitesting - self.ctx = None - kwargs = {'context': self.ctx} - qos_policy = rpc_registry.get_info( - resources.QOS_POLICY, - "46ebaec0-0570-43ac-82f6-60d2b03168c4", - **kwargs) - - LOG.debug("qos_policy test : %s)", - qos_policy) + self.register_port_callbacks() + self.register_net_callbacks() def register_resource_providers(self): rpc_registry.register_provider( _get_qos_bandwidth_limit_rule_cb_stub, - resources.QOS_RULE) + rpc_resources.QOS_RULE) rpc_registry.register_provider( _get_qos_policy_cb_stub, - resources.QOS_POLICY) + rpc_resources.QOS_POLICY) def register_port_callbacks(self): - # TODO(qos): Register the callbacks to properly manage - # extension of resources - pass + registry.subscribe( + self._extend_port_policy_data, resources.PORT, events.AFTER_READ) + + def _extend_port_policy_data(self, resource, event, trigger, **kwargs): + context = kwargs['context'] + port = kwargs['port'] + policy = policy_object.QosPolicy.get_port_policy(context, port['id']) + port['qos_policy_id'] = policy.id if policy else None + + def update_port_policy(self, context, port): + old_policy = policy_object.QosPolicy.get_port_policy( + context, port['id']) + if old_policy is not None: + #TODO(QoS): this means two transactions. One for detaching + # one for re-attaching, we may want to update + # within a single transaction instead, or put + # a whole transaction on top, or handle the switch + # at db api level automatically within transaction. + old_policy.detach_port(port['id']) + + qos_policy_id = port.get('qos_policy_id') + if qos_policy_id is not None: + policy = self._get_policy_obj(context, qos_policy_id) + policy.attach_port(port['id']) def register_net_callbacks(self): - # TODO(qos): Register the callbacks to properly manage - # extension of resources - pass + registry.subscribe(self._extend_network_policy_data, + resources.NETWORK, + events.AFTER_READ) + + def _extend_network_policy_data(self, resource, event, trigger, **kwargs): + context = kwargs['context'] + network = kwargs['network'] + policy = policy_object.QosPolicy.get_network_policy( + context, network['id']) + network['qos_policy_id'] = policy.id if policy else None + + def update_network_policy(self, context, network): + old_policy = policy_object.QosPolicy.get_network_policy( + context, network['id']) + if old_policy: + old_policy.detach_network(network['id']) + + qos_policy_id = network.get('qos_policy_id') + if qos_policy_id: + policy = self._get_policy_obj(context, qos_policy_id) + policy.attach_network(network['id']) def create_policy(self, context, policy): - pass + policy = policy_object.QosPolicy(context, **policy['policy']) + policy.create() + return policy.to_dict() - def update_policy(self, context, policy_id, policy): - pass + def update_policy(self, context, policy_id, qos_policy): + policy = policy_object.QosPolicy(context, **qos_policy['policy']) + policy.id = policy_id + policy.update() + return policy.to_dict() def delete_policy(self, context, policy_id): - pass + policy = policy_object.QosPolicy(context) + policy.id = policy_id + policy.delete() + + def _get_policy_obj(self, context, policy_id): + return policy_object.QosPolicy.get_by_id(context, policy_id) def get_policy(self, context, policy_id, fields=None): - pass + #TODO(QoS): Support the fields parameter + return self._get_policy_obj(context, policy_id).to_dict() def get_policies(self, context, filters=None, fields=None, sorts=None, limit=None, marker=None, page_reverse=False): - pass - + #TODO(QoS): Support all the optional parameters + return [policy_obj.to_dict() for policy_obj in + policy_object.QosPolicy.get_objects(context)] + + #TODO(QoS): Consider adding a proxy catch-all for rules, so + # we capture the API function call, and just pass + # the rule type as a parameter removing lots of + # future code duplication when we have more rules. def create_policy_bandwidth_limit_rule(self, context, policy_id, bandwidth_limit_rule): - pass + #TODO(QoS): avoid creation of severan bandwidth limit rules + # in the future we need an inter-rule validation + # mechanism to verify all created rules will + # play well together. + rule = rule_object.QosBandwidthLimitRule( + context, qos_policy_id=policy_id, + **bandwidth_limit_rule['bandwidth_limit_rule']) + rule.create() + return rule + + def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id, + bandwidth_limit_rule): + rule = rule_object.QosBandwidthLimitRule( + context, **bandwidth_limit_rule['bandwidth_limit_rule']) + rule.id = rule_id + rule.update() + return rule - def update_policy_bandwidth_limit_rule(self, context, rule_id, - policy_id, bandwidth_limit_rule): - pass + def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id): + rule = rule_object.QosBandwidthLimitRule() + rule.id = rule_id + rule.delete() def get_policy_bandwidth_limit_rule(self, context, rule_id, policy_id, fields=None): - pass - - def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id): - pass + #TODO(QoS): Support the fields parameter + return rule_object.QosBandwidthLimitRule.get_by_id(context, + rule_id).to_dict() def get_policy_bandwidth_limit_rules(self, context, policy_id, filters=None, fields=None, sorts=None, limit=None, marker=None, page_reverse=False): - pass + #TODO(QoS): Support all the optional parameters + return [rule_obj.to_dict() for rule_obj in + rule_object.QosBandwidthLimitRule.get_objects(context)] def get_rule_types(self, context, filters=None, fields=None, sorts=None, limit=None,