]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Implement QoS plugin
authorMike Kolesnik <mkolesni@redhat.com>
Tue, 30 Jun 2015 09:07:48 +0000 (12:07 +0300)
committerIhar Hrachyshka <ihrachys@redhat.com>
Thu, 16 Jul 2015 09:26:36 +0000 (11:26 +0200)
Initial implementation of the QoS service plugin that just implements
CRUD for policy and rule.

There are no tests yet.

path_prefix is now provided as an attribute to the plugin base, since
that's required by the COMMON_PREFIXES removal from master branch.

Partially-implements: blueprint quantum-qos-api
Change-Id: Icf821dec17f435d8e47e1047fb05225e7dd071f0

neutron/extensions/qos.py
neutron/services/qos/qos_plugin.py

index 16ffa8f7a7bdb6d4d909117ca798f3f0b2a99bc9..23d59eb900fcab302e178a70d30412e0c38c9322 100644 (file)
@@ -183,8 +183,9 @@ class Qos(extensions.ExtensionDescriptor):
 @six.add_metaclass(abc.ABCMeta)
 class QoSPluginBase(service_base.ServicePluginBase):
 
+    path_prefix = QOS_PREFIX
+
     def get_plugin_description(self):
-        """returns string description of the plugin."""
         return "QoS Service Plugin for ports and networks"
 
     def get_plugin_type(self):
@@ -230,8 +231,8 @@ class QoSPluginBase(service_base.ServicePluginBase):
         pass
 
     @abc.abstractmethod
-    def update_policy_bandwidth_limit_rule(self, context, rule_id,
-                                           policy_id, bandwidth_limit_rule):
+    def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
+                                           bandwidth_limit_rule):
         pass
 
     @abc.abstractmethod
index 2beb109ceb72007d3b75ac681b8faced58ad30d2..6ef13ae62f51ddcea7318ecc695110297ebc9038 100644 (file)
 from neutron import manager
 
 from neutron.api.rpc.callbacks import registry as rpc_registry
-from neutron.api.rpc.callbacks import resources
+from neutron.api.rpc.callbacks import resources as rpc_resources
+from neutron.callbacks import events
+from neutron.callbacks import registry
+from neutron.callbacks import resources
 from neutron.extensions import qos
 from neutron.i18n import _LW
+from neutron.objects.qos import policy as policy_object
+from neutron.objects.qos import rule as rule_object
 from neutron.plugins.common import constants
 
 from oslo_log import log as logging
@@ -103,78 +108,139 @@ class QoSPlugin(qos.QoSPluginBase):
     def __init__(self):
         super(QoSPlugin, self).__init__()
         self.register_resource_providers()
-        #self.register_port_callbacks()
-        #self.register_net_callbacks()
-        self._inline_test()
-
-    def _inline_test(self):
-        #TODO(gampel) remove inline unitesting
-        self.ctx = None
-        kwargs = {'context': self.ctx}
-        qos_policy = rpc_registry.get_info(
-            resources.QOS_POLICY,
-            "46ebaec0-0570-43ac-82f6-60d2b03168c4",
-            **kwargs)
-
-        LOG.debug("qos_policy test : %s)",
-                  qos_policy)
+        self.register_port_callbacks()
+        self.register_net_callbacks()
 
     def register_resource_providers(self):
         rpc_registry.register_provider(
             _get_qos_bandwidth_limit_rule_cb_stub,
-            resources.QOS_RULE)
+            rpc_resources.QOS_RULE)
 
         rpc_registry.register_provider(
             _get_qos_policy_cb_stub,
-            resources.QOS_POLICY)
+            rpc_resources.QOS_POLICY)
 
     def register_port_callbacks(self):
-        # TODO(qos): Register the callbacks to properly manage
-        #            extension of resources
-        pass
+        registry.subscribe(
+            self._extend_port_policy_data, resources.PORT, events.AFTER_READ)
+
+    def _extend_port_policy_data(self, resource, event, trigger, **kwargs):
+        context = kwargs['context']
+        port = kwargs['port']
+        policy = policy_object.QosPolicy.get_port_policy(context, port['id'])
+        port['qos_policy_id'] = policy.id if policy else None
+
+    def update_port_policy(self, context, port):
+        old_policy = policy_object.QosPolicy.get_port_policy(
+            context, port['id'])
+        if old_policy is not None:
+            #TODO(QoS): this means two transactions. One for detaching
+            #           one for re-attaching, we may want to update
+            #           within a single transaction instead, or put
+            #           a whole transaction on top, or handle the switch
+            #           at db api level automatically within transaction.
+            old_policy.detach_port(port['id'])
+
+        qos_policy_id = port.get('qos_policy_id')
+        if qos_policy_id is not None:
+            policy = self._get_policy_obj(context, qos_policy_id)
+            policy.attach_port(port['id'])
 
     def register_net_callbacks(self):
-        # TODO(qos): Register the callbacks to properly manage
-        #            extension of resources
-        pass
+        registry.subscribe(self._extend_network_policy_data,
+                           resources.NETWORK,
+                           events.AFTER_READ)
+
+    def _extend_network_policy_data(self, resource, event, trigger, **kwargs):
+        context = kwargs['context']
+        network = kwargs['network']
+        policy = policy_object.QosPolicy.get_network_policy(
+            context, network['id'])
+        network['qos_policy_id'] = policy.id if policy else None
+
+    def update_network_policy(self, context, network):
+        old_policy = policy_object.QosPolicy.get_network_policy(
+            context, network['id'])
+        if old_policy:
+            old_policy.detach_network(network['id'])
+
+        qos_policy_id = network.get('qos_policy_id')
+        if qos_policy_id:
+            policy = self._get_policy_obj(context, qos_policy_id)
+            policy.attach_network(network['id'])
 
     def create_policy(self, context, policy):
-        pass
+        policy = policy_object.QosPolicy(context, **policy['policy'])
+        policy.create()
+        return policy.to_dict()
 
-    def update_policy(self, context, policy_id, policy):
-        pass
+    def update_policy(self, context, policy_id, qos_policy):
+        policy = policy_object.QosPolicy(context, **qos_policy['policy'])
+        policy.id = policy_id
+        policy.update()
+        return policy.to_dict()
 
     def delete_policy(self, context, policy_id):
-        pass
+        policy = policy_object.QosPolicy(context)
+        policy.id = policy_id
+        policy.delete()
+
+    def _get_policy_obj(self, context, policy_id):
+        return policy_object.QosPolicy.get_by_id(context, policy_id)
 
     def get_policy(self, context, policy_id, fields=None):
-        pass
+        #TODO(QoS): Support the fields parameter
+        return self._get_policy_obj(context, policy_id).to_dict()
 
     def get_policies(self, context, filters=None, fields=None,
                      sorts=None, limit=None, marker=None,
                      page_reverse=False):
-        pass
-
+        #TODO(QoS): Support all the optional parameters
+        return [policy_obj.to_dict() for policy_obj in
+                policy_object.QosPolicy.get_objects(context)]
+
+    #TODO(QoS): Consider adding a proxy catch-all for rules, so
+    #           we capture the API function call, and just pass
+    #           the rule type as a parameter removing lots of
+    #           future code duplication when we have more rules.
     def create_policy_bandwidth_limit_rule(self, context, policy_id,
                                            bandwidth_limit_rule):
-        pass
+        #TODO(QoS): avoid creation of severan bandwidth limit rules
+        #           in the future we need an inter-rule validation
+        #           mechanism to verify all created rules will
+        #           play well together.
+        rule = rule_object.QosBandwidthLimitRule(
+            context, qos_policy_id=policy_id,
+            **bandwidth_limit_rule['bandwidth_limit_rule'])
+        rule.create()
+        return rule
+
+    def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id,
+                                           bandwidth_limit_rule):
+        rule = rule_object.QosBandwidthLimitRule(
+            context, **bandwidth_limit_rule['bandwidth_limit_rule'])
+        rule.id = rule_id
+        rule.update()
+        return rule
 
-    def update_policy_bandwidth_limit_rule(self, context, rule_id,
-                                           policy_id, bandwidth_limit_rule):
-        pass
+    def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
+        rule = rule_object.QosBandwidthLimitRule()
+        rule.id = rule_id
+        rule.delete()
 
     def get_policy_bandwidth_limit_rule(self, context, rule_id,
                                         policy_id, fields=None):
-        pass
-
-    def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id):
-        pass
+        #TODO(QoS): Support the fields parameter
+        return rule_object.QosBandwidthLimitRule.get_by_id(context,
+                                                           rule_id).to_dict()
 
     def get_policy_bandwidth_limit_rules(self, context, policy_id,
                                          filters=None, fields=None,
                                          sorts=None, limit=None,
                                          marker=None, page_reverse=False):
-        pass
+        #TODO(QoS): Support all the optional parameters
+        return [rule_obj.to_dict() for rule_obj in
+                rule_object.QosBandwidthLimitRule.get_objects(context)]
 
     def get_rule_types(self, context, filters=None, fields=None,
                        sorts=None, limit=None,