]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Generic rpc callback mechanism which could be reused
authorMiguel Angel Ajo <mangelajo@redhat.com>
Thu, 11 Jun 2015 13:21:28 +0000 (15:21 +0200)
committereran gampel <Eran.Gampel@Huawei.com>
Thu, 2 Jul 2015 12:51:30 +0000 (15:51 +0300)
This is a publisher/subscriber messaging mechanism optimized
for agent consumption and server production without the need
of creating new rpc messages when new resources are introduced.

Oslo versionedobjects are the perfect match to ensure
cross version compatibility even if the published/subscribed
resources format change over time.

This is still a basic stub allowing get_info of the resources,
and the next change will introduce the RPC methods to call
get_info: I0ac8a009e781b6edb283d8634b1a2f047db092dc

The plugin is returning stub objects to be consumed from the
agent to test the basic behaviour until we have DB.

TODO: Update documentation, according to code changes,
      enforce versioned objects only doing deserial/serialization.

Co-Authored-By: Miguel Angel Ajo <mangelajo@redhat.com>
Co-Authored-By: Eran Gampel <eran@gampel.net>
Change-Id: I524cf5a14e99dc6bee4d4261557d98c75efa0809

doc/source/devref/index.rst
doc/source/devref/rpc_callbacks.rst [new file with mode: 0644]
neutron/api/rpc/callbacks/__init__.py [new file with mode: 0644]
neutron/api/rpc/callbacks/events.py [new file with mode: 0644]
neutron/api/rpc/callbacks/registry.py [new file with mode: 0644]
neutron/api/rpc/callbacks/resource_manager.py [new file with mode: 0644]
neutron/api/rpc/callbacks/resources.py [new file with mode: 0644]
neutron/services/qos/qos_plugin.py
neutron/tests/unit/api/rpc/callbacks/__init__.py [new file with mode: 0644]
neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py [new file with mode: 0644]

index e00b1d891f2c586f17fc87713b90fd1c9565826d..e7bc843796b79499aaa970c3e5789de0e619537d 100644 (file)
@@ -46,6 +46,7 @@ Neutron Internals
    plugin-api
    db_layer
    rpc_api
+   rpc_callbacks
    layer3
    l2_agents
    quality_of_service
diff --git a/doc/source/devref/rpc_callbacks.rst b/doc/source/devref/rpc_callbacks.rst
new file mode 100644 (file)
index 0000000..01bc9b6
--- /dev/null
@@ -0,0 +1,229 @@
+=================================
+Neutron Messaging Callback System
+=================================
+
+Neutron already has a callback system [link-to: callbacks.rst] for
+in-process resource callbacks where publishers and subscribers are able
+to publish, subscribe and extend resources.
+
+This system is different, and is intended to be used for inter-process
+callbacks, via the messaging fanout mechanisms.
+
+In Neutron, agents may need to subscribe to specific resource details which
+may change over time. And the purpose of this messaging callback system
+is to allow agent subscription to those resources without the need to extend
+modify existing RPC calls, or creating new RPC messages.
+
+A few resource which can benefit of this system:
+
+* security groups members
+* security group rules,
+* QoS policies.
+
+Using a remote publisher/subscriber pattern, the information about such
+resources could be published using fanout queues to all interested nodes,
+minimizing messaging requests from agents to server since the agents
+get subscribed for their whole lifecycle (unless they unsubscribe).
+
+Within an agent, there could be multiple subscriber callbacks to the same
+resource events, the resources updates would be dispatched to the subscriber
+callbacks from a single message. Any update would come in a single message,
+doing only a single oslo versioned objects deserialization on each receiving
+agent.
+
+This publishing/subscription mechanism is highly dependent on the format
+of the resources passed around. This is why the library only allows
+versioned objects to be published and subscribed. Oslo versioned objects
+allow object version down/up conversion. #[vo_mkcompat]_ #[vo_mkcptests]_
+
+For the VO's versioning schema look here: #[vo_versioning]_
+
+
+
+versioned_objects serialization/deserialization with the
+obj_to_primitive(target_version=..) and primitive_to_obj() #[ov_serdes]_
+methods is used internally to convert/retrieve objects before/after messaging.
+
+Considering rolling upgrades, there are several scenarios to look at:
+
+* publisher (generally neutron-server or a service) and subscriber (agent)
+  know the same version of the objects, so they serialize, and deserialize
+  without issues.
+
+* publisher knows (and sends) an older version of the object, subscriber
+  will get the object updated to latest version on arrival before any
+  callback is called.
+
+* publisher sends a newer version of the object, subscriber won't be able
+  to deserialize the object, in this case (PLEASE DISCUSS), we can think of two
+  strategies:
+
+a) During upgrades, we pin neutron-server to a compatible version for resource
+   fanout updates, and server sends both the old, and the newer version to
+   different topic, queues. Old agents receive the updates on the old version
+   topic, new agents receive updates on the new version topic.
+   When the whole system upgraded, we un-pin the compatible version fanout.
+
+   A variant of this could be using a single fanout queue, and sending the
+   pinned version of the object to all. Newer agents can deserialize to the
+   latest version and upgrade any fields internally. Again at the end, we
+   unpin the version and restart the service.
+
+b) The subscriber will rpc call the publisher to start publishing also a downgraded
+   version of the object on every update on a separate queue. The complication
+   of this version, is the need to ignore new version objects as long as we keep
+   receiving the downgraded ones, and otherwise resend the request to send the
+   downgraded objects after a certain timeout (thinking of the case where the
+   request for downgraded queue is done, but the publisher restarted).
+   This approach is more complicated to implement, but more automated from the
+   administrator point of view. We may want to look into it as a second step
+   from a
+
+c) The subscriber will send a registry.get_info for the latest specific version
+   he knows off. This can have scalability issues during upgrade as any outdated
+   agent will require a flow of two messages (request, and response). This is
+   indeed very bad at scale if you have hundreds or thousands of agents.
+
+Option a seems like a reasonable strategy, similar to what nova does now with
+versioned objects.
+
+Serialized versioned objects look like::
+
+   {'versioned_object.version': '1.0',
+    'versioned_object.name': 'QoSProfile',
+    'versioned_object.data': {'rules': [
+                                        {'versioned_object.version': '1.0',
+                                         'versioned_object.name': 'QoSRule',
+                                         'versioned_object.data': {'name': u'a'},
+                                         'versioned_object.namespace': 'versionedobjects'}
+                                        ],
+                              'uuid': u'abcde',
+                              'name': u'aaa'},
+    'versioned_object.namespace': 'versionedobjects'}
+
+Topic names for the fanout queues
+=================================
+
+if we adopted option a:
+neutron-<resouce_type>_<resource_id>-<vo_version>
+[neutron-<resouce_type>_<resource_id>-<vo_version_compat>]
+
+if we adopted option b for rolling upgrades:
+neutron-<resource_type>-<resource_id>
+neutron-<resource_type>-<resource_id>-<vo_version>
+
+for option c, just:
+neutron-<resource_type>-<resource_id>
+
+Subscribing to resources
+========================
+
+Imagine that you have agent A, which just got to handle a new port, which
+has an associated security group, and QoS policy.
+
+The agent code processing port updates may look like::
+
+    from neutron.rpc_resources import events
+    from neutron.rpc_resources import resources
+    from neutron.rpc_resources import registry
+
+
+    def process_resource_updates(resource_type, resource_id, resource_list, action_type):
+
+        # send to the right handler which will update any control plane
+        # details related to the updated resource...
+
+
+    def port_update(...):
+
+        # here we extract sg_id and qos_policy_id from port..
+
+        registry.subscribe(resources.SG_RULES, sg_id,
+                           callback=process_resource_updates)
+        sg_rules = registry.get_info(resources.SG_RULES, sg_id)
+
+        registry.subscribe(resources.SG_MEMBERS, sg_id,
+                           callback=process_resource_updates)
+        sg_members = registry.get_info(resources.SG_MEMBERS, sg_id)
+
+        registry.subscribe(resources.QOS_RULES, qos_policy_id,
+                           callback=process_resource_updates)
+        qos_rules = registry.get_info(resources.QOS_RULES, qos_policy_id,
+                                      callback=process_resource_updates)
+
+        cleanup_subscriptions()
+
+
+    def cleanup_subscriptions()
+        sg_ids = determine_unreferenced_sg_ids()
+        qos_policy_id = determine_unreferenced_qos_policy_ids()
+        registry.unsubscribe_info(resource.SG_RULES, sg_ids)
+        registry.unsubscribe_info(resource.SG_MEMBERS, sg_ids)
+        registry.unsubscribe_info(resource.QOS_RULES, qos_policy_id)
+
+Another unsubscription strategy could be to lazily unsubscribe resources when
+we receive updates for them, and we discover that they are not needed anymore.
+
+Deleted resources are automatically unsubscribed as we receive the delete event.
+
+NOTE(irenab): this could be extended to core resources like ports, making use
+of the standard neutron in-process callbacks at server side and propagating
+AFTER_UPDATE events, for example, but we may need to wait until those callbacks
+are used with proper versioned objects.
+
+
+Unsubscribing to resources
+==========================
+
+There are a few options to unsubscribe registered callbacks:
+
+* unsubscribe_resource_id(): it selectively unsubscribes an specific
+                             resource type + id.
+* unsubscribe_resource_type(): it unsubscribes from an specific resource type,
+                               any ID.
+* unsubscribe_all(): it unsubscribes all subscribed resources and ids.
+
+
+Sending resource updates
+========================
+
+On the server side, resource updates could come from anywhere, a service plugin,
+an extension, anything that updates the resource and that it's of any interest
+to the agents.
+
+The server/publisher side may look like::
+
+    from neutron.rpc_resources import events
+    from neutron.rpc_resources import resources
+    from neutron.rpc_resources import registry as rpc_registry
+
+    def add_qos_x_rule(...):
+        update_the_db(...)
+        send_rpc_updates_on_qos_policy(qos_policy_id)
+
+    def del_qos_x_rule(...):
+        update_the_db(...)
+        send_rpc_deletion_of_qos_policy(qos_policy_id)
+
+    def send_rpc_updates_on_qos_policy(qos_policy_id):
+        rules = get_qos_policy_rules_versioned_object(qos_policy_id)
+        rpc_registry.notify(resources.QOS_RULES, qos_policy_id, rules, events.UPDATE)
+
+    def send_rpc_deletion_of_qos_policy(qos_policy_id):
+        rpc_registry.notify(resources.QOS_RULES, qos_policy_id, None, events.DELETE)
+
+    # This part is added for the registry mechanism, to be able to request
+    # older versions of the notified objects if any oudated agent requires
+    # them.
+    def retrieve_older_version_callback(qos_policy_id, version):
+        return get_qos_policy_rules_versioned_object(qos_policy_id, version)
+
+    rpc_registry.register_retrieve_callback(resource.QOS_RULES,
+                                            retrieve_older_version_callback)
+
+References
+==========
+.. [#ov_serdes] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L621
+.. [#vo_mkcompat] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L460
+.. [#vo_mkcptests] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L111
+.. [#vo_versioning] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L236
diff --git a/neutron/api/rpc/callbacks/__init__.py b/neutron/api/rpc/callbacks/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/api/rpc/callbacks/events.py b/neutron/api/rpc/callbacks/events.py
new file mode 100644 (file)
index 0000000..ff8193d
--- /dev/null
@@ -0,0 +1,19 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+UPDATED = 'updated'
+DELETED = 'deleted'
+
+VALID = (
+    UPDATED,
+    DELETED
+)
diff --git a/neutron/api/rpc/callbacks/registry.py b/neutron/api/rpc/callbacks/registry.py
new file mode 100644 (file)
index 0000000..fcf663e
--- /dev/null
@@ -0,0 +1,68 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.api.rpc.callbacks import resource_manager
+
+# TODO(ajo): consider adding locking
+CALLBACK_MANAGER = None
+
+
+def _get_resources_callback_manager():
+    global CALLBACK_MANAGER
+    if CALLBACK_MANAGER is None:
+        CALLBACK_MANAGER = resource_manager.ResourcesCallbacksManager()
+    return CALLBACK_MANAGER
+
+
+#resource implementation callback registration functions
+def get_info(resource_type, resource_id, **kwargs):
+    """Get information about resource type with resource id.
+
+    The function will check the providers for an specific remotable
+    resource and get the resource.
+
+    :returns: an oslo versioned object.
+    """
+    callback = _get_resources_callback_manager().get_callback(resource_type)
+    if callback:
+        return callback(resource_type, resource_id, **kwargs)
+
+
+def register_provider(callback, resource_type):
+    _get_resources_callback_manager().register(callback, resource_type)
+
+
+# resource RPC callback for pub/sub
+#Agent side
+def subscribe(callback, resource_type, resource_id):
+    #TODO(QoS): we have to finish the real update notifications
+    raise NotImplementedError("we should finish update notifications")
+
+
+def unsubscribe(callback, resource_type, resource_id):
+    #TODO(QoS): we have to finish the real update notifications
+    raise NotImplementedError("we should finish update notifications")
+
+
+def unsubscribe_all():
+    #TODO(QoS): we have to finish the real update notifications
+    raise NotImplementedError("we should finish update notifications")
+
+
+#Server side
+def notify(resource_type, event, obj):
+    #TODO(QoS): we have to finish the real update notifications
+    raise NotImplementedError("we should finish update notifications")
+
+
+def clear():
+    _get_resources_callback_manager().clear()
diff --git a/neutron/api/rpc/callbacks/resource_manager.py b/neutron/api/rpc/callbacks/resource_manager.py
new file mode 100644 (file)
index 0000000..02e940f
--- /dev/null
@@ -0,0 +1,69 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+
+from oslo_log import log as logging
+
+from neutron.api.rpc.callbacks import resources
+from neutron.callbacks import exceptions
+
+LOG = logging.getLogger(__name__)
+
+
+class ResourcesCallbacksManager(object):
+    """A callback system that allows information providers in a loose manner.
+    """
+
+    def __init__(self):
+        self.clear()
+
+    def register(self, callback, resource):
+        """register callback for a resource .
+
+        One callback can be register to a resource
+
+        :param callback: the callback. It must raise or return a dict.
+        :param resource: the resource. It must be a valid resource.
+        """
+        LOG.debug("register: %(callback)s %(resource)s",
+                  {'callback': callback, 'resource': resource})
+        if resource not in resources.VALID:
+            raise exceptions.Invalid(element='resource', value=resource)
+
+        self._callbacks[resource] = callback
+
+    def unregister(self, resource):
+        """Unregister callback from the registry.
+
+        :param callback: the callback.
+        :param resource: the resource.
+        """
+        LOG.debug("Unregister: %(resource)s",
+                  {'resource': resource})
+        if resource not in resources.VALID:
+            raise exceptions.Invalid(element='resource', value=resource)
+        self._callbacks[resource] = None
+
+    def clear(self):
+        """Brings the manager to a clean slate."""
+        self._callbacks = collections.defaultdict(dict)
+
+    def get_callback(self, resource):
+        """Return the callback if found, None otherwise.
+
+        :param resource: the resource. It must be a valid resource.
+        """
+        if resource not in resources.VALID:
+            raise exceptions.Invalid(element='resource', value=resource)
+
+        return self._callbacks[resource]
diff --git a/neutron/api/rpc/callbacks/resources.py b/neutron/api/rpc/callbacks/resources.py
new file mode 100644 (file)
index 0000000..027dde2
--- /dev/null
@@ -0,0 +1,19 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+QOS_POLICY = 'qos-policy'
+QOS_RULE = 'qos-rule'
+
+VALID = (
+    QOS_POLICY,
+    QOS_RULE,
+)
index bc866ae01b157dd3e739df43856914cbe4ce0eb0..a60abcc72371fc447c4c6613b335c7cdbee6dd81 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from neutron import manager
+
+from neutron.api.rpc.callbacks import registry as rpc_registry
+from neutron.api.rpc.callbacks import resources
 from neutron.extensions import qos
+from neutron.i18n import _LW
+from neutron.plugins.common import constants
+
+from oslo_log import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+#TODO(QoS): remove this stub when db is ready
+def _get_qos_policy_cb_stub(resource, policy_id, **kwargs):
+    """Hardcoded stub for testing until we get the db working."""
+    qos_policy = {
+        "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
+        "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+        "name": "10Mbit",
+        "description": "This policy limits the ports to 10Mbit max.",
+        "shared": False,
+        "rules": [{
+            "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+            "max_kbps": "10000",
+            "max_burst_kbps": "0",
+            "type": "bandwidth_limit"
+        }]
+    }
+    return qos_policy
+
+
+def _get_qos_policy_cb(resource, policy_id, **kwargs):
+    qos_plugin = manager.NeutronManager.get_service_plugins().get(
+        constants.QOS)
+    context = kwargs.get('context')
+    if context is None:
+        LOG.warning(_LW(
+            'Received %(resource)s %(policy_id)s without context'),
+            {'resource': resource, 'policy_id': policy_id}
+        )
+        return
+
+    qos_policy = qos_plugin.get_qos_policy(context, policy_id)
+    return qos_policy
+
+
+#TODO(QoS): remove this stub when db is ready
+def _get_qos_bandwidth_limit_rule_cb_stub(resource, rule_id, **kwargs):
+    """Hardcoded for testing until we get the db working."""
+    bandwidth_limit = {
+        "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+        "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+        "max_kbps": "10000",
+        "max_burst_kbps": "0",
+    }
+    return bandwidth_limit
+
+
+def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
+    qos_plugin = manager.NeutronManager.get_service_plugins().get(
+        constants.QOS)
+    context = kwargs.get('context')
+    if context is None:
+        LOG.warning(_LW(
+            'Received %(resource)s %(rule_id,)s without context '),
+            {'resource': resource, 'rule_id,': rule_id}
+        )
+        return
+
+    bandwidth_limit = qos_plugin.get_qos_bandwidth_limit_rule(
+                                        context,
+                                        rule_id)
+    return bandwidth_limit
 
 
 class QoSPlugin(qos.QoSPluginBase):
@@ -28,16 +102,31 @@ class QoSPlugin(qos.QoSPluginBase):
 
     def __init__(self):
         super(QoSPlugin, self).__init__()
-        #self.register_rpc()
+        self.register_resource_providers()
         #self.register_port_callbacks()
         #self.register_net_callbacks()
-
-    def register_rpc(self):
-        # RPC support
-        # TODO(ajo): register ourselves to the generic RPC framework
-        #            so we will provide QoS information for ports and
-        #            networks.
-        pass
+        self._inline_test()
+
+    def _inline_test(self):
+        #TODO(gampel) remove inline unitesting
+        self.ctx = None
+        kwargs = {'context': self.ctx}
+        qos_policy = rpc_registry.get_info(
+            resources.QOS_POLICY,
+            "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+            **kwargs)
+
+        LOG.debug("qos_policy test : %s)",
+                  qos_policy)
+
+    def register_resource_providers(self):
+        rpc_registry.register_provider(
+            _get_qos_bandwidth_limit_rule_cb_stub,
+            resources.QOS_RULE)
+
+        rpc_registry.register_provider(
+            _get_qos_policy_cb_stub,
+            resources.QOS_POLICY)
 
     def register_port_callbacks(self):
         # TODO(qos): Register the callbacks to properly manage
diff --git a/neutron/tests/unit/api/rpc/callbacks/__init__.py b/neutron/tests/unit/api/rpc/callbacks/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py b/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py
new file mode 100644 (file)
index 0000000..f68e02d
--- /dev/null
@@ -0,0 +1,78 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+
+from neutron.api.rpc.callbacks import registry as rpc_registry
+from neutron.api.rpc.callbacks import resources
+
+
+from neutron.tests import base
+
+
+class ResourcesCallbackRequestTestCase(base.BaseTestCase):
+
+    def setUp(self):
+        super(ResourcesCallbackRequestTestCase, self).setUp()
+        self.resource_id = '46ebaec0-0570-43ac-82f6-60d2b03168c4'
+        self.qos_rule_id = '5f126d84-551a-4dcf-bb01-0e9c0df0c793'
+
+    def test_resource_callback_request(self):
+
+        #TODO(QoS) convert it to the version object format
+        def _get_qos_policy_cb(resource, policy_id, **kwargs):
+            qos_policy = {
+                "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
+                "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+                "name": "10Mbit",
+                "description": "This policy limits the ports to 10Mbit max.",
+                "shared": False,
+                "rules": [{
+                    "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+                    "max_kbps": "10000",
+                    "max_burst_kbps": "0",
+                    "type": "bnadwidth_limit"
+                }]
+            }
+            return qos_policy
+
+        #TODO(QoS) convert it to the version object format
+        def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
+            bandwidth_limit = {
+                "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+                "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+                "max_kbps": "10000",
+                "max_burst_kbps": "0",
+            }
+            return bandwidth_limit
+
+        rpc_registry.register_provider(
+                        _get_qos_bandwidth_limit_rule_cb,
+                        resources.QOS_RULE)
+
+        rpc_registry.register_provider(
+            _get_qos_policy_cb,
+            resources.QOS_POLICY)
+
+        self.ctx = None
+        kwargs = {'context': self.ctx}
+
+        qos_policy = rpc_registry.get_info(
+            resources.QOS_POLICY,
+            self.resource_id,
+            **kwargs)
+        self.assertEqual(self.resource_id, qos_policy['id'])
+
+        qos_rule = rpc_registry.get_info(
+            resources.QOS_RULE,
+            self.qos_rule_id,
+            **kwargs)
+        self.assertEqual(self.qos_rule_id, qos_rule['id'])