]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
neutron.api.rpc.callbacks interface rework
authorMiguel Angel Ajo <mangelajo@redhat.com>
Fri, 24 Jul 2015 00:45:35 +0000 (02:45 +0200)
committerIhar Hrachyshka <ihrachys@redhat.com>
Sat, 8 Aug 2015 08:24:20 +0000 (10:24 +0200)
Split rpc.callbacks interface into consumer and producer parts.

Better terms are chosen for two RPC APIs we have:
- pull when a component actively requests a new object state;
- push when a component updates anyone interested about an object
  change.

Also, for callback registration, the following terms are used:
- subscribe when a component is registered in consumer registry;
- provide when a component is registered in provider registry.

Covered the registries with some unit tests.

Lots of existing tests utilize the registries now, and need to be
isolated from other tests that mess with the managers (that are
singletons), so introduced a common qos base test class to mock the
manager with per-test instance of it).

Co-Authored-By: Ihar Hrachyshka <ihrachys@redhat.com>
Partially-Implements: blueprint quantum-qos-api
Change-Id: I130cfbc8b78da6df4405b90ea1ab47899491ba41

26 files changed:
doc/source/devref/rpc_callbacks.rst
neutron/agent/l2/extensions/qos.py
neutron/api/rpc/callbacks/consumer/__init__.py [new file with mode: 0644]
neutron/api/rpc/callbacks/consumer/registry.py [new file with mode: 0644]
neutron/api/rpc/callbacks/events.py
neutron/api/rpc/callbacks/exceptions.py [new file with mode: 0644]
neutron/api/rpc/callbacks/producer/__init__.py [new file with mode: 0644]
neutron/api/rpc/callbacks/producer/registry.py [new file with mode: 0644]
neutron/api/rpc/callbacks/registry.py [deleted file]
neutron/api/rpc/callbacks/resource_manager.py
neutron/api/rpc/handlers/resources_rpc.py
neutron/plugins/ml2/plugin.py
neutron/services/qos/notification_drivers/message_queue.py
neutron/services/qos/qos_plugin.py
neutron/tests/unit/agent/l2/extensions/test_qos.py
neutron/tests/unit/api/rpc/callbacks/consumer/__init__.py [new file with mode: 0644]
neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py [new file with mode: 0644]
neutron/tests/unit/api/rpc/callbacks/producer/__init__.py [new file with mode: 0644]
neutron/tests/unit/api/rpc/callbacks/producer/test_registry.py [new file with mode: 0644]
neutron/tests/unit/api/rpc/callbacks/test_registry.py [deleted file]
neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py
neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py
neutron/tests/unit/services/qos/base.py [new file with mode: 0644]
neutron/tests/unit/services/qos/notification_drivers/test_manager.py
neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py
neutron/tests/unit/services/qos/test_qos_plugin.py

index 01bc9b6c9c6627b750aeff4caa50c1008ef358a3..f72672482b39516f295450eb7421869830e526aa 100644 (file)
@@ -4,7 +4,7 @@ Neutron Messaging Callback System
 
 Neutron already has a callback system [link-to: callbacks.rst] for
 in-process resource callbacks where publishers and subscribers are able
-to publish, subscribe and extend resources.
+to publish and subscribe for resource events.
 
 This system is different, and is intended to be used for inter-process
 callbacks, via the messaging fanout mechanisms.
@@ -16,12 +16,11 @@ modify existing RPC calls, or creating new RPC messages.
 
 A few resource which can benefit of this system:
 
-* security groups members
-* security group rules,
-* QoS policies.
+* QoS policies;
+* Security Groups.
 
 Using a remote publisher/subscriber pattern, the information about such
-resources could be published using fanout queues to all interested nodes,
+resources could be published using fanout messages to all interested nodes,
 minimizing messaging requests from agents to server since the agents
 get subscribed for their whole lifecycle (unless they unsubscribe).
 
@@ -38,8 +37,6 @@ allow object version down/up conversion. #[vo_mkcompat]_ #[vo_mkcptests]_
 
 For the VO's versioning schema look here: #[vo_versioning]_
 
-
-
 versioned_objects serialization/deserialization with the
 obj_to_primitive(target_version=..) and primitive_to_obj() #[ov_serdes]_
 methods is used internally to convert/retrieve objects before/after messaging.
@@ -58,42 +55,21 @@ Considering rolling upgrades, there are several scenarios to look at:
   to deserialize the object, in this case (PLEASE DISCUSS), we can think of two
   strategies:
 
-a) During upgrades, we pin neutron-server to a compatible version for resource
-   fanout updates, and server sends both the old, and the newer version to
-   different topic, queues. Old agents receive the updates on the old version
-   topic, new agents receive updates on the new version topic.
-   When the whole system upgraded, we un-pin the compatible version fanout.
-
-   A variant of this could be using a single fanout queue, and sending the
-   pinned version of the object to all. Newer agents can deserialize to the
-   latest version and upgrade any fields internally. Again at the end, we
-   unpin the version and restart the service.
-
-b) The subscriber will rpc call the publisher to start publishing also a downgraded
-   version of the object on every update on a separate queue. The complication
-   of this version, is the need to ignore new version objects as long as we keep
-   receiving the downgraded ones, and otherwise resend the request to send the
-   downgraded objects after a certain timeout (thinking of the case where the
-   request for downgraded queue is done, but the publisher restarted).
-   This approach is more complicated to implement, but more automated from the
-   administrator point of view. We may want to look into it as a second step
-   from a
-
-c) The subscriber will send a registry.get_info for the latest specific version
-   he knows off. This can have scalability issues during upgrade as any outdated
-   agent will require a flow of two messages (request, and response). This is
-   indeed very bad at scale if you have hundreds or thousands of agents.
-
-Option a seems like a reasonable strategy, similar to what nova does now with
-versioned objects.
+
+The strategy for upgrades will be:
+   During upgrades, we pin neutron-server to a compatible version for resource
+   fanout updates, and the server sends both the old, and the newer version.
+   The new agents process updates, taking the newer version of the resource
+   fanout updates.  When the whole system upgraded, we un-pin the compatible
+   version fanout.
 
 Serialized versioned objects look like::
 
    {'versioned_object.version': '1.0',
-    'versioned_object.name': 'QoSProfile',
+    'versioned_object.name': 'QoSPolicy',
     'versioned_object.data': {'rules': [
                                         {'versioned_object.version': '1.0',
-                                         'versioned_object.name': 'QoSRule',
+                                         'versioned_object.name': 'QoSBandwidthLimitRule',
                                          'versioned_object.data': {'name': u'a'},
                                          'versioned_object.namespace': 'versionedobjects'}
                                         ],
@@ -101,19 +77,18 @@ Serialized versioned objects look like::
                               'name': u'aaa'},
     'versioned_object.namespace': 'versionedobjects'}
 
-Topic names for the fanout queues
-=================================
+Topic names for every resource type RPC endpoint
+================================================
+
+neutron-vo-<resource_class_name>-<version>
 
-if we adopted option a:
-neutron-<resouce_type>_<resource_id>-<vo_version>
-[neutron-<resouce_type>_<resource_id>-<vo_version_compat>]
+In the future, we may want to get oslo messaging to support subscribing
+topics dynamically, then we may want to use:
 
-if we adopted option b for rolling upgrades:
-neutron-<resource_type>-<resource_id>
-neutron-<resource_type>-<resource_id>-<vo_version>
+neutron-vo-<resource_class_name>-<resource_id>-<version> instead,
 
-for option c, just:
-neutron-<resource_type>-<resource_id>
+or something equivalent which would allow fine granularity for the receivers
+to only get interesting information to them.
 
 Subscribing to resources
 ========================
@@ -123,103 +98,86 @@ has an associated security group, and QoS policy.
 
 The agent code processing port updates may look like::
 
-    from neutron.rpc_resources import events
-    from neutron.rpc_resources import resources
-    from neutron.rpc_resources import registry
+    from neutron.api.rpc.callbacks.consumer import registry
+    from neutron.api.rpc.callbacks import events
+    from neutron.api.rpc.callbacks import resources
 
 
-    def process_resource_updates(resource_type, resource_id, resource_list, action_type):
+    def process_resource_updates(resource_type, resource, event_type):
 
         # send to the right handler which will update any control plane
         # details related to the updated resource...
 
 
-    def port_update(...):
+    def subscribe_resources():
+        registry.subscribe(process_resource_updates, resources.SEC_GROUP)
 
-        # here we extract sg_id and qos_policy_id from port..
+        registry.subscribe(process_resource_updates, resources.QOS_POLICY)
 
-        registry.subscribe(resources.SG_RULES, sg_id,
-                           callback=process_resource_updates)
-        sg_rules = registry.get_info(resources.SG_RULES, sg_id)
+    def port_update(port):
 
-        registry.subscribe(resources.SG_MEMBERS, sg_id,
-                           callback=process_resource_updates)
-        sg_members = registry.get_info(resources.SG_MEMBERS, sg_id)
+        # here we extract sg_id and qos_policy_id from port..
 
-        registry.subscribe(resources.QOS_RULES, qos_policy_id,
-                           callback=process_resource_updates)
-        qos_rules = registry.get_info(resources.QOS_RULES, qos_policy_id,
-                                      callback=process_resource_updates)
+        sec_group = registry.pull(resources.SEC_GROUP, sg_id)
+        qos_policy = registry.pull(resources.QOS_POLICY, qos_policy_id)
 
-        cleanup_subscriptions()
 
+The relevant function is:
 
-    def cleanup_subscriptions()
-        sg_ids = determine_unreferenced_sg_ids()
-        qos_policy_id = determine_unreferenced_qos_policy_ids()
-        registry.unsubscribe_info(resource.SG_RULES, sg_ids)
-        registry.unsubscribe_info(resource.SG_MEMBERS, sg_ids)
-        registry.unsubscribe_info(resource.QOS_RULES, qos_policy_id)
+* subscribe(callback, resource_type): subscribes callback to a resource type.
 
-Another unsubscription strategy could be to lazily unsubscribe resources when
-we receive updates for them, and we discover that they are not needed anymore.
 
-Deleted resources are automatically unsubscribed as we receive the delete event.
+The callback function will receive the following arguments:
 
-NOTE(irenab): this could be extended to core resources like ports, making use
-of the standard neutron in-process callbacks at server side and propagating
-AFTER_UPDATE events, for example, but we may need to wait until those callbacks
-are used with proper versioned objects.
+* resource_type: the type of resource which is receiving the update.
+* resource: resource of supported object
+* event_type: will be one of CREATED, UPDATED, or DELETED, see
+  neutron.api.rpc.callbacks.events for details.
 
+With the underlaying oslo_messaging support for dynamic topics on the receiver
+we cannot implement a per "resource type + resource id" topic, rabbitmq seems
+to handle 10000's of topics without suffering, but creating 100's of
+oslo_messaging receivers on different topics seems to crash.
 
-Unsubscribing to resources
-==========================
+We may want to look into that later, to avoid agents receiving resource updates
+which are uninteresting to them.
 
-There are a few options to unsubscribe registered callbacks:
+Unsubscribing from resources
+============================
 
-* unsubscribe_resource_id(): it selectively unsubscribes an specific
-                             resource type + id.
-* unsubscribe_resource_type(): it unsubscribes from an specific resource type,
-                               any ID.
-* unsubscribe_all(): it unsubscribes all subscribed resources and ids.
+To unsubscribe registered callbacks:
 
+* unsubscribe(callback, resource_type): unsubscribe from specific resource type.
+* unsubscribe_all(): unsubscribe from all resources.
 
-Sending resource updates
-========================
+
+Sending resource events
+=======================
 
 On the server side, resource updates could come from anywhere, a service plugin,
-an extension, anything that updates the resource and that it's of any interest
-to the agents.
+an extension, anything that updates, creates, or destroys the resource and that
+is of any interest to subscribed agents.
 
 The server/publisher side may look like::
 
-    from neutron.rpc_resources import events
-    from neutron.rpc_resources import resources
-    from neutron.rpc_resources import registry as rpc_registry
+    from neutron.api.rpc.callbacks.producer import registry
+    from neutron.api.rpc.callbacks import events
 
-    def add_qos_x_rule(...):
+    def create_qos_policy(...):
+        policy = fetch_policy(...)
         update_the_db(...)
-        send_rpc_updates_on_qos_policy(qos_policy_id)
+        registry.push(policy, events.CREATED)
 
-    def del_qos_x_rule(...):
+    def update_qos_policy(...):
+        policy = fetch_policy(...)
         update_the_db(...)
-        send_rpc_deletion_of_qos_policy(qos_policy_id)
+        registry.push(policy, events.UPDATED)
 
-    def send_rpc_updates_on_qos_policy(qos_policy_id):
-        rules = get_qos_policy_rules_versioned_object(qos_policy_id)
-        rpc_registry.notify(resources.QOS_RULES, qos_policy_id, rules, events.UPDATE)
-
-    def send_rpc_deletion_of_qos_policy(qos_policy_id):
-        rpc_registry.notify(resources.QOS_RULES, qos_policy_id, None, events.DELETE)
-
-    # This part is added for the registry mechanism, to be able to request
-    # older versions of the notified objects if any oudated agent requires
-    # them.
-    def retrieve_older_version_callback(qos_policy_id, version):
-        return get_qos_policy_rules_versioned_object(qos_policy_id, version)
+    def delete_qos_policy(...):
+        policy = fetch_policy(...)
+        update_the_db(...)
+        registry.push(policy, events.DELETED)
 
-    rpc_registry.register_retrieve_callback(resource.QOS_RULES,
-                                            retrieve_older_version_callback)
 
 References
 ==========
index f3442c8ea2f9938434a0f4dae8665fd3c5320763..6483d5aa9f0e163974dc90c56e8cf7f4bd3a7577 100644 (file)
@@ -76,7 +76,7 @@ class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
         """
         super(QosAgentExtension, self).initialize()
 
-        self.resource_rpc = resources_rpc.ResourcesServerRpcApi()
+        self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
         self.qos_driver = manager.NeutronManager.load_class_for_provider(
             'neutron.qos.agent_drivers', cfg.CONF.qos.agent_driver)()
         self.qos_driver.initialize()
@@ -111,8 +111,8 @@ class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
         # 1. to add new api for subscribe
         #    registry.subscribe(self._process_policy_updates,
         #                   resources.QOS_POLICY, qos_policy_id)
-        # 2. combine get_info rpc to also subscribe to the resource
-        qos_policy = self.resource_rpc.get_info(
+        # 2. combine pull rpc to also subscribe to the resource
+        qos_policy = self.resource_rpc.pull(
             context,
             resources.QOS_POLICY,
             qos_policy_id)
diff --git a/neutron/api/rpc/callbacks/consumer/__init__.py b/neutron/api/rpc/callbacks/consumer/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/api/rpc/callbacks/consumer/registry.py b/neutron/api/rpc/callbacks/consumer/registry.py
new file mode 100644 (file)
index 0000000..454e423
--- /dev/null
@@ -0,0 +1,44 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from oslo_log import log as logging
+
+from neutron.api.rpc.callbacks import resource_manager
+
+
+LOG = logging.getLogger(__name__)
+
+
+#TODO(ajo): consider adding locking to _get_manager, it's
+#           safe for eventlet, but not for normal threading.
+def _get_manager():
+    return resource_manager.ConsumerResourceCallbacksManager()
+
+
+def subscribe(callback, resource_type):
+    _get_manager().register(callback, resource_type)
+
+
+def unsubscribe(callback, resource_type):
+    _get_manager().unregister(callback, resource_type)
+
+
+def push(resource_type, resource, event_type):
+    """Push resource events into all registered callbacks for the type."""
+
+    callbacks = _get_manager().get_callbacks(resource_type)
+    for callback in callbacks:
+        callback(resource_type, resource, event_type)
+
+
+def clear():
+    _get_manager().clear()
index ff8193d9ed1214c637bb79a598290745b72942b6..485a1bc801e1dc0c7e7e300aaa1e54aedf645319 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+CREATED = 'created'
 UPDATED = 'updated'
 DELETED = 'deleted'
 
 VALID = (
+    CREATED,
     UPDATED,
     DELETED
 )
diff --git a/neutron/api/rpc/callbacks/exceptions.py b/neutron/api/rpc/callbacks/exceptions.py
new file mode 100644 (file)
index 0000000..9e17474
--- /dev/null
@@ -0,0 +1,25 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.common import exceptions
+
+
+class CallbackWrongResourceType(exceptions.NeutronException):
+    message = _('Callback for %(resource_type)s returned wrong resource type')
+
+
+class CallbackNotFound(exceptions.NeutronException):
+    message = _('Callback for %(resource_type)s not found')
+
+
+class CallbacksMaxLimitReached(exceptions.NeutronException):
+    message = _("Cannot add multiple callbacks for %(resource_type)s")
diff --git a/neutron/api/rpc/callbacks/producer/__init__.py b/neutron/api/rpc/callbacks/producer/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/api/rpc/callbacks/producer/registry.py b/neutron/api/rpc/callbacks/producer/registry.py
new file mode 100644 (file)
index 0000000..b19a8bf
--- /dev/null
@@ -0,0 +1,62 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from oslo_log import log as logging
+
+from neutron.api.rpc.callbacks import exceptions
+from neutron.api.rpc.callbacks import resource_manager
+from neutron.objects import base
+
+
+LOG = logging.getLogger(__name__)
+
+
+# TODO(ajo): consider adding locking: it's safe for eventlet but not
+#            for other types of threading.
+def _get_manager():
+    return resource_manager.ProducerResourceCallbacksManager()
+
+
+def provide(callback, resource_type):
+    """Register a callback as a producer for the resource type.
+
+    This callback will be used to produce resources of corresponding type for
+    interested parties.
+    """
+    _get_manager().register(callback, resource_type)
+
+
+def unprovide(callback, resource_type):
+    """Unregister a callback for corresponding resource type."""
+    _get_manager().unregister(callback, resource_type)
+
+
+def clear():
+    """Clear all callbacks."""
+    _get_manager().clear()
+
+
+def pull(resource_type, resource_id, **kwargs):
+    """Get resource object that corresponds to resource id.
+
+    The function will return an object that is provided by resource producer.
+
+    :returns: NeutronObject
+    """
+    callback = _get_manager().get_callback(resource_type)
+    obj = callback(resource_type, resource_id, **kwargs)
+    if obj:
+        if (not isinstance(obj, base.NeutronObject) or
+            resource_type != obj.obj_name()):
+            raise exceptions.CallbackWrongResourceType(
+                resource_type=resource_type)
+    return obj
diff --git a/neutron/api/rpc/callbacks/registry.py b/neutron/api/rpc/callbacks/registry.py
deleted file mode 100644 (file)
index de13298..0000000
+++ /dev/null
@@ -1,87 +0,0 @@
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-from neutron.api.rpc.callbacks import resource_manager
-from neutron.api.rpc.callbacks import resources
-from neutron.common import exceptions
-
-
-# TODO(ajo): consider adding locking
-CALLBACK_MANAGER = None
-
-
-def _get_resources_callback_manager():
-    global CALLBACK_MANAGER
-    if CALLBACK_MANAGER is None:
-        CALLBACK_MANAGER = resource_manager.ResourcesCallbacksManager()
-    return CALLBACK_MANAGER
-
-
-class CallbackReturnedWrongObjectType(exceptions.NeutronException):
-    message = _('Callback for %(resource_type)s returned wrong object type')
-
-
-class CallbackNotFound(exceptions.NeutronException):
-    message = _('Callback for %(resource_type)s not found')
-
-
-#resource implementation callback registration functions
-def get_info(resource_type, resource_id, **kwargs):
-    """Get information about resource type with resource id.
-
-    The function will check the providers for a specific remotable
-    resource and get the resource.
-
-    :returns: NeutronObject
-    """
-    callback = _get_resources_callback_manager().get_callback(resource_type)
-    if not callback:
-        raise CallbackNotFound(resource_type=resource_type)
-
-    obj = callback(resource_type, resource_id, **kwargs)
-    if obj:
-        expected_cls = resources.get_resource_cls(resource_type)
-        if not isinstance(obj, expected_cls):
-            raise CallbackReturnedWrongObjectType(
-                resource_type=resource_type)
-    return obj
-
-
-def register_provider(callback, resource_type):
-    _get_resources_callback_manager().register(callback, resource_type)
-
-
-# resource RPC callback for pub/sub
-#Agent side
-def subscribe(callback, resource_type, resource_id):
-    #TODO(QoS): we have to finish the real update notifications
-    raise NotImplementedError("we should finish update notifications")
-
-
-def unsubscribe(callback, resource_type, resource_id):
-    #TODO(QoS): we have to finish the real update notifications
-    raise NotImplementedError("we should finish update notifications")
-
-
-def unsubscribe_all():
-    #TODO(QoS): we have to finish the real update notifications
-    raise NotImplementedError("we should finish update notifications")
-
-
-#Server side
-def notify(resource_type, event, obj):
-    #TODO(QoS): we have to finish the real update notifications
-    raise NotImplementedError("we should finish update notifications")
-
-
-def clear():
-    _get_resources_callback_manager().clear()
index f28326fef7242e295f3567ddad4ba414f0ec07bb..63f898033588f6107a5b1ab940bc29b341782364 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import abc
 import collections
 
 from oslo_log import log as logging
+import six
 
+from neutron.api.rpc.callbacks import exceptions as rpc_exc
 from neutron.api.rpc.callbacks import resources
 from neutron.callbacks import exceptions
 
 LOG = logging.getLogger(__name__)
 
+# TODO(QoS): split the registry/resources_rpc modules into two separate things:
+# one for pull and one for push APIs
 
-class ResourcesCallbacksManager(object):
+
+def _validate_resource_type(resource_type):
+    if not resources.is_valid_resource_type(resource_type):
+        raise exceptions.Invalid(element='resource', value=resource_type)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class ResourceCallbacksManager(object):
     """A callback system that allows information providers in a loose manner.
     """
 
-    def __init__(self):
-        self.clear()
+    # This hook is to allow tests to get new objects for the class
+    _singleton = True
+
+    def __new__(cls, *args, **kwargs):
+        if not cls._singleton:
+            return super(ResourceCallbacksManager, cls).__new__(cls)
+
+        if not hasattr(cls, '_instance'):
+            cls._instance = super(ResourceCallbacksManager, cls).__new__(cls)
+        return cls._instance
+
+    @abc.abstractmethod
+    def _add_callback(self, callback, resource_type):
+        pass
+
+    @abc.abstractmethod
+    def _delete_callback(self, callback, resource_type):
+        pass
 
     def register(self, callback, resource_type):
         """Register a callback for a resource type.
 
-        Only one callback can be registered for a resource type.
-
         :param callback: the callback. It must raise or return NeutronObject.
         :param resource_type: must be a valid resource type.
         """
-        LOG.debug("register: %(callback)s %(resource_type)s",
-                  {'callback': callback, 'resource_type': resource_type})
-        if not resources.is_valid_resource_type(resource_type):
-            raise exceptions.Invalid(element='resource', value=resource_type)
+        LOG.debug("Registering callback for %s", resource_type)
+        _validate_resource_type(resource_type)
+        self._add_callback(callback, resource_type)
 
-        self._callbacks[resource_type] = callback
-
-    def unregister(self, resource_type):
+    def unregister(self, callback, resource_type):
         """Unregister callback from the registry.
 
-        :param resource: must be a valid resource type.
+        :param callback: the callback.
+        :param resource_type: must be a valid resource type.
         """
-        LOG.debug("Unregister: %s", resource_type)
-        if not resources.is_valid_resource_type(resource_type):
-            raise exceptions.Invalid(element='resource', value=resource_type)
-        self._callbacks[resource_type] = None
+        LOG.debug("Unregistering callback for %s", resource_type)
+        _validate_resource_type(resource_type)
+        self._delete_callback(callback, resource_type)
 
+    @abc.abstractmethod
     def clear(self):
         """Brings the manager to a clean state."""
-        self._callbacks = collections.defaultdict(dict)
+
+    def get_subscribed_types(self):
+        return list(self._callbacks.keys())
+
+
+class ProducerResourceCallbacksManager(ResourceCallbacksManager):
+
+    _callbacks = dict()
+
+    def _add_callback(self, callback, resource_type):
+        if resource_type in self._callbacks:
+            raise rpc_exc.CallbacksMaxLimitReached(resource_type=resource_type)
+        self._callbacks[resource_type] = callback
+
+    def _delete_callback(self, callback, resource_type):
+        try:
+            del self._callbacks[resource_type]
+        except KeyError:
+            raise rpc_exc.CallbackNotFound(resource_type=resource_type)
+
+    def clear(self):
+        self._callbacks = dict()
 
     def get_callback(self, resource_type):
+        _validate_resource_type(resource_type)
+        try:
+            return self._callbacks[resource_type]
+        except KeyError:
+            raise rpc_exc.CallbackNotFound(resource_type=resource_type)
+
+
+class ConsumerResourceCallbacksManager(ResourceCallbacksManager):
+
+    _callbacks = collections.defaultdict(set)
+
+    def _add_callback(self, callback, resource_type):
+        self._callbacks[resource_type].add(callback)
+
+    def _delete_callback(self, callback, resource_type):
+        try:
+            self._callbacks[resource_type].remove(callback)
+            if not self._callbacks[resource_type]:
+                del self._callbacks[resource_type]
+        except KeyError:
+            raise rpc_exc.CallbackNotFound(resource_type=resource_type)
+
+    def clear(self):
+        self._callbacks = collections.defaultdict(set)
+
+    def get_callbacks(self, resource_type):
         """Return the callback if found, None otherwise.
 
         :param resource_type: must be a valid resource type.
         """
-        if not resources.is_valid_resource_type(resource_type):
-            raise exceptions.Invalid(element='resource', value=resource_type)
-
-        return self._callbacks[resource_type]
+        _validate_resource_type(resource_type)
+        callbacks = self._callbacks[resource_type]
+        if not callbacks:
+            raise rpc_exc.CallbackNotFound(resource_type=resource_type)
+        return callbacks
index 6c801e5dc2a112b533faddfd16efd3fb7d198140..eed2dfde0763aeeda900885805dac5e9ba589326 100755 (executable)
@@ -17,7 +17,7 @@ from oslo_log import helpers as log_helpers
 from oslo_log import log as logging
 import oslo_messaging
 
-from neutron.api.rpc.callbacks import registry
+from neutron.api.rpc.callbacks.producer import registry
 from neutron.api.rpc.callbacks import resources
 from neutron.common import constants
 from neutron.common import exceptions
@@ -46,14 +46,20 @@ def _validate_resource_type(resource_type):
         raise InvalidResourceTypeClass(resource_type=resource_type)
 
 
-class ResourcesServerRpcApi(object):
+class ResourcesPullRpcApi(object):
     """Agent-side RPC (stub) for agent-to-plugin interaction.
 
     This class implements the client side of an rpc interface.  The server side
-    can be found below: ResourcesServerRpcCallback.  For more information on
+    can be found below: ResourcesPullRpcCallback.  For more information on
     this RPC interface, see doc/source/devref/rpc_callbacks.rst.
     """
 
+    def __new__(cls):
+        # make it a singleton
+        if not hasattr(cls, '_instance'):
+            cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls)
+        return cls._instance
+
     def __init__(self):
         target = oslo_messaging.Target(
             topic=topics.PLUGIN, version='1.0',
@@ -61,7 +67,7 @@ class ResourcesServerRpcApi(object):
         self.client = n_rpc.get_client(target)
 
     @log_helpers.log_method_call
-    def get_info(self, context, resource_type, resource_id):
+    def pull(self, context, resource_type, resource_id):
         _validate_resource_type(resource_type)
 
         # we've already validated the resource type, so we are pretty sure the
@@ -69,7 +75,7 @@ class ResourcesServerRpcApi(object):
         resource_type_cls = resources.get_resource_cls(resource_type)
 
         cctxt = self.client.prepare()
-        primitive = cctxt.call(context, 'get_info',
+        primitive = cctxt.call(context, 'pull',
             resource_type=resource_type,
             version=resource_type_cls.VERSION, resource_id=resource_id)
 
@@ -82,11 +88,11 @@ class ResourcesServerRpcApi(object):
         return obj
 
 
-class ResourcesServerRpcCallback(object):
+class ResourcesPullRpcCallback(object):
     """Plugin-side RPC (implementation) for agent-to-plugin interaction.
 
     This class implements the server side of an rpc interface.  The client side
-    can be found above: ResourcesServerRpcApi.  For more information on
+    can be found above: ResourcesPullRpcApi.  For more information on
     this RPC interface, see doc/source/devref/rpc_callbacks.rst.
     """
 
@@ -96,14 +102,10 @@ class ResourcesServerRpcCallback(object):
     target = oslo_messaging.Target(
         version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
 
-    def get_info(self, context, resource_type, version, resource_id):
+    def pull(self, context, resource_type, version, resource_id):
         _validate_resource_type(resource_type)
 
-        obj = registry.get_info(
-            resource_type,
-            resource_id,
-            context=context)
-
+        obj = registry.pull(resource_type, resource_id, context=context)
         if obj:
             # don't request a backport for the latest known version
             if version == obj.VERSION:
index cdcd3a61a2c48d2fc71576dc41a0006a8119aba0..85b9f4837609262deafe2853fea5104275b0ae9a 100644 (file)
@@ -164,7 +164,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
             dhcp_rpc.DhcpRpcCallback(),
             agents_db.AgentExtRpcCallback(),
             metadata_rpc.MetadataRpcCallback(),
-            resources_rpc.ResourcesServerRpcCallback()
+            resources_rpc.ResourcesPullRpcCallback()
         ]
 
     def _setup_dhcp(self):
index d430730a6d08cda4fbcac44acf3cbe9068fc45f6..aa804f72306c96a0d255e2c005b93d930ff98f1f 100644 (file)
@@ -12,8 +12,7 @@
 
 from oslo_log import log as logging
 
-from neutron.api.rpc.callbacks import events
-from neutron.api.rpc.callbacks import registry as rpc_registry
+from neutron.api.rpc.callbacks.producer import registry
 from neutron.api.rpc.callbacks import resources
 from neutron.i18n import _LW
 from neutron.objects.qos import policy as policy_object
@@ -41,9 +40,7 @@ class RpcQosServiceNotificationDriver(
     """RPC message queue service notification driver for QoS."""
 
     def __init__(self):
-        rpc_registry.register_provider(
-            _get_qos_policy_cb,
-            resources.QOS_POLICY)
+        registry.provide(_get_qos_policy_cb, resources.QOS_POLICY)
 
     def get_description(self):
         return "Message queue updates"
@@ -53,19 +50,9 @@ class RpcQosServiceNotificationDriver(
         pass
 
     def update_policy(self, policy):
-        # TODO(QoS): this is temporary until we get notify() implemented
-        try:
-            rpc_registry.notify(resources.QOS_POLICY,
-                                events.UPDATED,
-                                policy)
-        except NotImplementedError:
-            pass
+        # TODO(QoS): implement notification
+        pass
 
     def delete_policy(self, policy):
-        # TODO(QoS): this is temporary until we get notify() implemented
-        try:
-            rpc_registry.notify(resources.QOS_POLICY,
-                                events.DELETED,
-                                policy)
-        except NotImplementedError:
-            pass
+        # TODO(QoS): implement notification
+        pass
index 9073d712bc9121ce51857f8ea40d8df2deb5da15..0b91d46b9c21f3723206953fd9366049fbb73d6e 100644 (file)
@@ -60,8 +60,8 @@ class QoSPlugin(qos.QoSPluginBase):
     def delete_policy(self, context, policy_id):
         policy = policy_object.QosPolicy(context)
         policy.id = policy_id
-        self.notification_driver_manager.delete_policy(policy)
         policy.delete()
+        self.notification_driver_manager.delete_policy(policy)
 
     def _get_policy_obj(self, context, policy_id):
         obj = policy_object.QosPolicy.get_by_id(context, policy_id)
index 8772394bdb1c055f9b776c1f6a7f0a797341cde0..006044bf36998c371e8713291fd7568aa1ba2332 100755 (executable)
@@ -23,7 +23,7 @@ from neutron.tests import base
 
 # This is a minimalistic mock of rules to be passed/checked around
 # which should be exteneded as needed to make real rules
-TEST_GET_INFO_RULES = ['rule1', 'rule2']
+TEST_GET_RESOURCE_RULES = ['rule1', 'rule2']
 
 
 class QosAgentExtensionTestCase(base.BaseTestCase):
@@ -40,11 +40,10 @@ class QosAgentExtensionTestCase(base.BaseTestCase):
         ).start()
 
         self.qos_ext.initialize()
-        self._create_fake_resource_rpc()
 
-    def _create_fake_resource_rpc(self):
-        self.get_info_mock = mock.Mock(return_value=TEST_GET_INFO_RULES)
-        self.qos_ext.resource_rpc.get_info = self.get_info_mock
+        self.pull_mock = mock.patch.object(
+            self.qos_ext.resource_rpc, 'pull',
+            return_value=TEST_GET_RESOURCE_RULES).start()
 
     def _create_test_port_dict(self):
         return {'port_id': uuidutils.generate_uuid(),
@@ -65,7 +64,7 @@ class QosAgentExtensionTestCase(base.BaseTestCase):
         # we make sure the underlaying qos driver is called with the
         # right parameters
         self.qos_ext.qos_driver.create.assert_called_once_with(
-            port, TEST_GET_INFO_RULES)
+            port, TEST_GET_RESOURCE_RULES)
         self.assertEqual(port,
             self.qos_ext.qos_policy_ports[qos_policy_id][port_id])
         self.assertTrue(port_id in self.qos_ext.known_ports)
@@ -81,10 +80,10 @@ class QosAgentExtensionTestCase(base.BaseTestCase):
     def test_handle_known_port_change_policy_id(self):
         port = self._create_test_port_dict()
         self.qos_ext.handle_port(self.context, port)
-        self.qos_ext.resource_rpc.get_info.reset_mock()
+        self.qos_ext.resource_rpc.pull.reset_mock()
         port['qos_policy_id'] = uuidutils.generate_uuid()
         self.qos_ext.handle_port(self.context, port)
-        self.get_info_mock.assert_called_once_with(
+        self.pull_mock.assert_called_once_with(
              self.context, resources.QOS_POLICY,
              port['qos_policy_id'])
         #TODO(QoS): handle qos_driver.update call check when
diff --git a/neutron/tests/unit/api/rpc/callbacks/consumer/__init__.py b/neutron/tests/unit/api/rpc/callbacks/consumer/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py b/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py
new file mode 100644 (file)
index 0000000..5d18e53
--- /dev/null
@@ -0,0 +1,56 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from neutron.api.rpc.callbacks.consumer import registry
+from neutron.tests import base
+
+
+class ConsumerRegistryTestCase(base.BaseTestCase):
+
+    def setUp(self):
+        super(ConsumerRegistryTestCase, self).setUp()
+
+    def test__get_manager_is_singleton(self):
+        self.assertIs(registry._get_manager(), registry._get_manager())
+
+    @mock.patch.object(registry, '_get_manager')
+    def test_subscribe(self, manager_mock):
+        callback = lambda: None
+        registry.subscribe(callback, 'TYPE')
+        manager_mock().register.assert_called_with(callback, 'TYPE')
+
+    @mock.patch.object(registry, '_get_manager')
+    def test_unsubscribe(self, manager_mock):
+        callback = lambda: None
+        registry.unsubscribe(callback, 'TYPE')
+        manager_mock().unregister.assert_called_with(callback, 'TYPE')
+
+    @mock.patch.object(registry, '_get_manager')
+    def test_clear(self, manager_mock):
+        registry.clear()
+        manager_mock().clear.assert_called_with()
+
+    @mock.patch.object(registry, '_get_manager')
+    def test_push(self, manager_mock):
+        resource_type_ = object()
+        resource_ = object()
+        event_type_ = object()
+
+        callback1 = mock.Mock()
+        callback2 = mock.Mock()
+        callbacks = {callback1, callback2}
+        manager_mock().get_callbacks.return_value = callbacks
+        registry.push(resource_type_, resource_, event_type_)
+        for callback in callbacks:
+            callback.assert_called_with(resource_type_, resource_, event_type_)
diff --git a/neutron/tests/unit/api/rpc/callbacks/producer/__init__.py b/neutron/tests/unit/api/rpc/callbacks/producer/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/neutron/tests/unit/api/rpc/callbacks/producer/test_registry.py b/neutron/tests/unit/api/rpc/callbacks/producer/test_registry.py
new file mode 100644 (file)
index 0000000..5b7b049
--- /dev/null
@@ -0,0 +1,81 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from neutron.api.rpc.callbacks import exceptions
+from neutron.api.rpc.callbacks.producer import registry
+from neutron.api.rpc.callbacks import resources
+from neutron.objects.qos import policy
+from neutron.tests.unit.services.qos import base
+
+
+class ProducerRegistryTestCase(base.BaseQosTestCase):
+
+    def test_pull_returns_callback_result(self):
+        policy_obj = policy.QosPolicy(context=None)
+
+        def _fake_policy_cb(*args, **kwargs):
+            return policy_obj
+
+        registry.provide(_fake_policy_cb, resources.QOS_POLICY)
+
+        self.assertEqual(
+            policy_obj,
+            registry.pull(resources.QOS_POLICY, 'fake_id'))
+
+    def test_pull_does_not_raise_on_none(self):
+        def _none_cb(*args, **kwargs):
+            pass
+
+        registry.provide(_none_cb, resources.QOS_POLICY)
+
+        obj = registry.pull(resources.QOS_POLICY, 'fake_id')
+        self.assertIsNone(obj)
+
+    def test_pull_raises_on_wrong_object_type(self):
+        def _wrong_type_cb(*args, **kwargs):
+            return object()
+
+        registry.provide(_wrong_type_cb, resources.QOS_POLICY)
+
+        self.assertRaises(
+            exceptions.CallbackWrongResourceType,
+            registry.pull, resources.QOS_POLICY, 'fake_id')
+
+    def test_pull_raises_on_callback_not_found(self):
+        self.assertRaises(
+            exceptions.CallbackNotFound,
+            registry.pull, resources.QOS_POLICY, 'fake_id')
+
+    def test__get_manager_is_singleton(self):
+        self.assertIs(registry._get_manager(), registry._get_manager())
+
+    def test_unprovide(self):
+        def _fake_policy_cb(*args, **kwargs):
+            pass
+
+        registry.provide(_fake_policy_cb, resources.QOS_POLICY)
+        registry.unprovide(_fake_policy_cb, resources.QOS_POLICY)
+
+        self.assertRaises(
+            exceptions.CallbackNotFound,
+            registry.pull, resources.QOS_POLICY, 'fake_id')
+
+    def test_clear_unprovides_all_producers(self):
+        def _fake_policy_cb(*args, **kwargs):
+            pass
+
+        registry.provide(_fake_policy_cb, resources.QOS_POLICY)
+        registry.clear()
+
+        self.assertRaises(
+            exceptions.CallbackNotFound,
+            registry.pull, resources.QOS_POLICY, 'fake_id')
diff --git a/neutron/tests/unit/api/rpc/callbacks/test_registry.py b/neutron/tests/unit/api/rpc/callbacks/test_registry.py
deleted file mode 100644 (file)
index 3c12b38..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-import mock
-
-from neutron.api.rpc.callbacks import registry
-from neutron.api.rpc.callbacks import resource_manager
-from neutron.api.rpc.callbacks import resources
-from neutron.objects.qos import policy
-from neutron.tests import base
-
-
-class GetInfoTestCase(base.BaseTestCase):
-    def setUp(self):
-        super(GetInfoTestCase, self).setUp()
-        mgr = resource_manager.ResourcesCallbacksManager()
-        mgr_p = mock.patch.object(
-            registry, '_get_resources_callback_manager', return_value=mgr)
-        mgr_p.start()
-
-    def test_returns_callback_result(self):
-        policy_obj = policy.QosPolicy(context=None)
-
-        def _fake_policy_cb(*args, **kwargs):
-            return policy_obj
-
-        registry.register_provider(_fake_policy_cb, resources.QOS_POLICY)
-
-        self.assertEqual(policy_obj,
-                         registry.get_info(resources.QOS_POLICY, 'fake_id'))
-
-    def test_does_not_raise_on_none(self):
-        def _wrong_type_cb(*args, **kwargs):
-            pass
-
-        registry.register_provider(_wrong_type_cb, resources.QOS_POLICY)
-
-        obj = registry.get_info(resources.QOS_POLICY, 'fake_id')
-        self.assertIsNone(obj)
-
-    def test_raises_on_wrong_object_type(self):
-        def _wrong_type_cb(*args, **kwargs):
-            return object()
-
-        registry.register_provider(_wrong_type_cb, resources.QOS_POLICY)
-
-        self.assertRaises(
-            registry.CallbackReturnedWrongObjectType,
-            registry.get_info, resources.QOS_POLICY, 'fake_id')
-
-    def test_raises_on_callback_not_found(self):
-        self.assertRaises(
-            registry.CallbackNotFound,
-            registry.get_info, resources.QOS_POLICY, 'fake_id')
index bc708dbbd28d488ea1079a8c10f3e6e190b599fd..79d5ed55c5a6cad60677ad482eb3da2292886c66 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import mock
 
-from neutron.api.rpc.callbacks import registry as rpc_registry
-from neutron.api.rpc.callbacks import resources
-from neutron.objects.qos import policy
-from neutron.objects.qos import rule
+from neutron.api.rpc.callbacks import exceptions as rpc_exc
+from neutron.api.rpc.callbacks import resource_manager
+from neutron.callbacks import exceptions as exceptions
+from neutron.tests.unit.services.qos import base
 
+IS_VALID_RESOURCE_TYPE = (
+    'neutron.api.rpc.callbacks.resources.is_valid_resource_type')
 
-from neutron.tests import base
 
+class ResourceCallbacksManagerTestCaseMixin(object):
 
-class ResourcesCallbackRequestTestCase(base.BaseTestCase):
+    def test_register_fails_on_invalid_type(self):
+        self.assertRaises(
+            exceptions.Invalid,
+            self.mgr.register, lambda: None, 'TYPE')
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_clear_unregisters_all_callbacks(self, *mocks):
+        self.mgr.register(lambda: None, 'TYPE1')
+        self.mgr.register(lambda: None, 'TYPE2')
+        self.mgr.clear()
+        self.assertEqual([], self.mgr.get_subscribed_types())
+
+    def test_unregister_fails_on_invalid_type(self):
+        self.assertRaises(
+            exceptions.Invalid,
+            self.mgr.unregister, lambda: None, 'TYPE')
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_unregister_fails_on_unregistered_callback(self, *mocks):
+        self.assertRaises(
+            rpc_exc.CallbackNotFound,
+            self.mgr.unregister, lambda: None, 'TYPE')
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_unregister_unregisters_callback(self, *mocks):
+        callback = lambda: None
+        self.mgr.register(callback, 'TYPE')
+        self.mgr.unregister(callback, 'TYPE')
+        self.assertEqual([], self.mgr.get_subscribed_types())
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test___init___does_not_reset_callbacks(self, *mocks):
+        callback = lambda: None
+        self.mgr.register(callback, 'TYPE')
+        resource_manager.ProducerResourceCallbacksManager()
+        self.assertEqual(['TYPE'], self.mgr.get_subscribed_types())
+
+
+class ProducerResourceCallbacksManagerTestCase(
+    base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin):
+
+    def setUp(self):
+        super(ProducerResourceCallbacksManagerTestCase, self).setUp()
+        self.mgr = self.prod_mgr
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_register_registers_callback(self, *mocks):
+        callback = lambda: None
+        self.mgr.register(callback, 'TYPE')
+        self.assertEqual(callback, self.mgr.get_callback('TYPE'))
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_register_fails_on_multiple_calls(self, *mocks):
+        self.mgr.register(lambda: None, 'TYPE')
+        self.assertRaises(
+            rpc_exc.CallbacksMaxLimitReached,
+            self.mgr.register, lambda: None, 'TYPE')
+
+    def test_get_callback_fails_on_invalid_type(self):
+        self.assertRaises(
+            exceptions.Invalid,
+            self.mgr.get_callback, 'TYPE')
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_get_callback_fails_on_unregistered_callback(
+            self, *mocks):
+        self.assertRaises(
+            rpc_exc.CallbackNotFound,
+            self.mgr.get_callback, 'TYPE')
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_get_callback_returns_proper_callback(self, *mocks):
+        callback1 = lambda: None
+        callback2 = lambda: None
+        self.mgr.register(callback1, 'TYPE1')
+        self.mgr.register(callback2, 'TYPE2')
+        self.assertEqual(callback1, self.mgr.get_callback('TYPE1'))
+        self.assertEqual(callback2, self.mgr.get_callback('TYPE2'))
+
+
+class ConsumerResourceCallbacksManagerTestCase(
+    base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin):
 
     def setUp(self):
-        super(ResourcesCallbackRequestTestCase, self).setUp()
-        self.resource_id = '46ebaec0-0570-43ac-82f6-60d2b03168c4'
-        self.qos_rule_id = '5f126d84-551a-4dcf-bb01-0e9c0df0c793'
-
-    def test_resource_callback_request(self):
-
-        def _get_qos_policy_cb(resource, policy_id, **kwargs):
-            context = kwargs.get('context')
-            qos_policy = policy.QosPolicy(context,
-                tenant_id="8d4c70a21fed4aeba121a1a429ba0d04",
-                id="46ebaec0-0570-43ac-82f6-60d2b03168c4",
-                name="10Mbit",
-                description="This policy limits the ports to 10Mbit max.",
-                shared=False,
-                rules=[
-                    rule.QosBandwidthLimitRule(context,
-                        id="5f126d84-551a-4dcf-bb01-0e9c0df0c793",
-                        max_kbps=10000,
-                        max_burst_kbps=0)
-                ]
-            )
-            qos_policy.obj_reset_changes()
-            return qos_policy
-
-        rpc_registry.register_provider(
-            _get_qos_policy_cb,
-            resources.QOS_POLICY)
-
-        self.ctx = None
-        kwargs = {'context': self.ctx}
-
-        qos_policy = rpc_registry.get_info(
-            resources.QOS_POLICY,
-            self.resource_id,
-            **kwargs)
-        self.assertEqual(self.resource_id, qos_policy['id'])
+        super(ConsumerResourceCallbacksManagerTestCase, self).setUp()
+        self.mgr = self.cons_mgr
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_register_registers_callback(self, *mocks):
+        callback = lambda: None
+        self.mgr.register(callback, 'TYPE')
+        self.assertEqual({callback}, self.mgr.get_callbacks('TYPE'))
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_register_succeeds_on_multiple_calls(self, *mocks):
+        callback1 = lambda: None
+        callback2 = lambda: None
+        self.mgr.register(callback1, 'TYPE')
+        self.mgr.register(callback2, 'TYPE')
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_get_callbacks_fails_on_unregistered_callback(
+        self, *mocks):
+        self.assertRaises(
+            rpc_exc.CallbackNotFound,
+            self.mgr.get_callbacks, 'TYPE')
+
+    @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True)
+    def test_get_callbacks_returns_proper_callbacks(self, *mocks):
+        callback1 = lambda: None
+        callback2 = lambda: None
+        self.mgr.register(callback1, 'TYPE1')
+        self.mgr.register(callback2, 'TYPE2')
+        self.assertEqual(set([callback1]), self.mgr.get_callbacks('TYPE1'))
+        self.assertEqual(set([callback2]), self.mgr.get_callbacks('TYPE2'))
index 3d1104c408d31595b9c97eea576cbae4864ce2ad..f7b52201f6f509e590a75b1bebfda7b0b71802e2 100755 (executable)
@@ -42,55 +42,59 @@ class ResourcesRpcBaseTestCase(base.BaseTestCase):
         return policy_obj
 
 
-class ResourcesServerRpcApiTestCase(ResourcesRpcBaseTestCase):
+class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
 
     def setUp(self):
-        super(ResourcesServerRpcApiTestCase, self).setUp()
+        super(ResourcesPullRpcApiTestCase, self).setUp()
         self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client')
         self.client = self.client_p.start()
-        self.rpc = resources_rpc.ResourcesServerRpcApi()
+        self.rpc = resources_rpc.ResourcesPullRpcApi()
         self.mock_cctxt = self.rpc.client.prepare.return_value
 
-    def test_get_info(self):
+    def test_is_singleton(self):
+        self.assertEqual(id(self.rpc),
+                         id(resources_rpc.ResourcesPullRpcApi()))
+
+    def test_pull(self):
         policy_dict = self._create_test_policy_dict()
         expected_policy_obj = self._create_test_policy(policy_dict)
         qos_policy_id = policy_dict['id']
         self.mock_cctxt.call.return_value = (
             expected_policy_obj.obj_to_primitive())
-        get_info_result = self.rpc.get_info(
+        pull_result = self.rpc.pull(
             self.context, resources.QOS_POLICY, qos_policy_id)
         self.mock_cctxt.call.assert_called_once_with(
-            self.context, 'get_info', resource_type=resources.QOS_POLICY,
+            self.context, 'pull', resource_type=resources.QOS_POLICY,
             version=policy.QosPolicy.VERSION, resource_id=qos_policy_id)
-        self.assertEqual(expected_policy_obj, get_info_result)
+        self.assertEqual(expected_policy_obj, pull_result)
 
-    def test_get_info_invalid_resource_type_cls(self):
+    def test_pull_invalid_resource_type_cls(self):
         self.assertRaises(
-            resources_rpc.InvalidResourceTypeClass, self.rpc.get_info,
+            resources_rpc.InvalidResourceTypeClass, self.rpc.pull,
             self.context, 'foo_type', 'foo_id')
 
-    def test_get_info_resource_not_found(self):
+    def test_pull_resource_not_found(self):
         policy_dict = self._create_test_policy_dict()
         qos_policy_id = policy_dict['id']
         self.mock_cctxt.call.return_value = None
         self.assertRaises(
-            resources_rpc.ResourceNotFound, self.rpc.get_info, self.context,
-            resources.QOS_POLICY, qos_policy_id)
+            resources_rpc.ResourceNotFound, self.rpc.pull,
+            self.context, resources.QOS_POLICY, qos_policy_id)
 
 
-class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase):
+class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
 
     def setUp(self):
-        super(ResourcesServerRpcCallbackTestCase, self).setUp()
-        self.callbacks = resources_rpc.ResourcesServerRpcCallback()
+        super(ResourcesPullRpcCallbackTestCase, self).setUp()
+        self.callbacks = resources_rpc.ResourcesPullRpcCallback()
 
-    def test_get_info(self):
+    def test_pull(self):
         policy_dict = self._create_test_policy_dict()
         policy_obj = self._create_test_policy(policy_dict)
         qos_policy_id = policy_dict['id']
-        with mock.patch.object(resources_rpc.registry, 'get_info',
+        with mock.patch.object(resources_rpc.registry, 'pull',
                                return_value=policy_obj) as registry_mock:
-            primitive = self.callbacks.get_info(
+            primitive = self.callbacks.pull(
                 self.context, resource_type=resources.QOS_POLICY,
                 version=policy.QosPolicy.VERSION,
                 resource_id=qos_policy_id)
@@ -101,26 +105,26 @@ class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase):
         self.assertEqual(policy_obj.obj_to_primitive(), primitive)
 
     @mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
-    def test_get_info_no_backport_for_latest_version(self, to_prim_mock):
+    def test_pull_no_backport_for_latest_version(self, to_prim_mock):
         policy_dict = self._create_test_policy_dict()
         policy_obj = self._create_test_policy(policy_dict)
         qos_policy_id = policy_dict['id']
-        with mock.patch.object(resources_rpc.registry, 'get_info',
+        with mock.patch.object(resources_rpc.registry, 'pull',
                                return_value=policy_obj):
-            self.callbacks.get_info(
+            self.callbacks.pull(
                 self.context, resource_type=resources.QOS_POLICY,
                 version=policy.QosPolicy.VERSION,
                 resource_id=qos_policy_id)
             to_prim_mock.assert_called_with(target_version=None)
 
     @mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
-    def test_get_info_backports_to_older_version(self, to_prim_mock):
+    def test_pull_backports_to_older_version(self, to_prim_mock):
         policy_dict = self._create_test_policy_dict()
         policy_obj = self._create_test_policy(policy_dict)
         qos_policy_id = policy_dict['id']
-        with mock.patch.object(resources_rpc.registry, 'get_info',
+        with mock.patch.object(resources_rpc.registry, 'pull',
                                return_value=policy_obj):
-            self.callbacks.get_info(
+            self.callbacks.pull(
                 self.context, resource_type=resources.QOS_POLICY,
                 version='0.9',  # less than initial version 1.0
                 resource_id=qos_policy_id)
diff --git a/neutron/tests/unit/services/qos/base.py b/neutron/tests/unit/services/qos/base.py
new file mode 100644 (file)
index 0000000..e731340
--- /dev/null
@@ -0,0 +1,38 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import mock
+
+from neutron.api.rpc.callbacks.consumer import registry as cons_registry
+from neutron.api.rpc.callbacks.producer import registry as prod_registry
+from neutron.api.rpc.callbacks import resource_manager
+from neutron.tests import base
+
+
+class BaseQosTestCase(base.BaseTestCase):
+    def setUp(self):
+        super(BaseQosTestCase, self).setUp()
+
+        with mock.patch.object(
+            resource_manager.ResourceCallbacksManager, '_singleton',
+            new_callable=mock.PropertyMock(return_value=False)):
+
+            self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
+            self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()
+            for mgr in (self.cons_mgr, self.prod_mgr):
+                mgr.clear()
+
+        mock.patch.object(
+            cons_registry, '_get_manager', return_value=self.cons_mgr).start()
+
+        mock.patch.object(
+            prod_registry, '_get_manager', return_value=self.prod_mgr).start()
index 6f67fa605b97607c27cd38d425f10fb2db70757a..efc1cbbbb030b5d2b2fd08d0c9f6069207fbc172 100644 (file)
@@ -14,12 +14,11 @@ import mock
 from oslo_config import cfg
 
 from neutron.api.rpc.callbacks import events
-from neutron.api.rpc.callbacks import resources
 from neutron import context
 from neutron.objects.qos import policy as policy_object
 from neutron.services.qos.notification_drivers import manager as driver_mgr
 from neutron.services.qos.notification_drivers import message_queue
-from neutron.tests import base
+from neutron.tests.unit.services.qos import base
 
 DUMMY_DRIVER = ("neutron.tests.unit.services.qos.notification_drivers."
                 "dummy.DummyQosServiceNotificationDriver")
@@ -32,16 +31,12 @@ def _load_multiple_drivers():
         "qos")
 
 
-class TestQosDriversManager(base.BaseTestCase):
+class TestQosDriversManagerBase(base.BaseQosTestCase):
 
     def setUp(self):
-        super(TestQosDriversManager, self).setUp()
+        super(TestQosDriversManagerBase, self).setUp()
         self.config_parse()
         self.setup_coreplugin()
-        self.registry_p = mock.patch(
-                            'neutron.api.rpc.callbacks.registry.notify')
-        self.registry_m = self.registry_p.start()
-        self.driver_manager = driver_mgr.QosServiceNotificationDriverManager()
         config = cfg.ConfigOpts()
         config.register_opts(driver_mgr.QOS_PLUGIN_OPTS, "qos")
         self.policy_data = {'policy': {
@@ -56,17 +51,20 @@ class TestQosDriversManager(base.BaseTestCase):
         ctxt = None
         self.kwargs = {'context': ctxt}
 
+
+class TestQosDriversManager(TestQosDriversManagerBase):
+
+    def setUp(self):
+        super(TestQosDriversManager, self).setUp()
+        self.driver_manager = driver_mgr.QosServiceNotificationDriverManager()
+
     def _validate_registry_params(self, event_type, policy):
-        self.assertTrue(self.registry_m.called, policy)
-        self.registry_m.assert_called_with(
-                resources.QOS_POLICY,
-                event_type,
-                policy)
+        #TODO(QoS): actually validate the notification once implemented
+        pass
 
     def test_create_policy_default_configuration(self):
         #RPC driver should be loaded by default
         self.driver_manager.create_policy(self.policy)
-        self.assertFalse(self.registry_m.called)
 
     def test_update_policy_default_configuration(self):
         #RPC driver should be loaded by default
@@ -78,9 +76,11 @@ class TestQosDriversManager(base.BaseTestCase):
         self.driver_manager.delete_policy(self.policy)
         self._validate_registry_params(events.DELETED, self.policy)
 
+
+class TestQosDriversManagerMulti(TestQosDriversManagerBase):
+
     def _test_multi_drivers_configuration_op(self, op):
         _load_multiple_drivers()
-        # create a new manager with new configuration
         driver_manager = driver_mgr.QosServiceNotificationDriverManager()
         handler = '%s_policy' % op
         with mock.patch('.'.join([DUMMY_DRIVER, handler])) as dummy_mock:
index a4f163f54b26f5b6bf1e1556aae52438e6429fed..710451307a9f6ddf9b2ce366e17a8a582387ce26 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import mock
-
 from neutron.api.rpc.callbacks import events
-from neutron.api.rpc.callbacks import resources
 from neutron import context
 from neutron.objects.qos import policy as policy_object
 from neutron.objects.qos import rule as rule_object
 from neutron.services.qos.notification_drivers import message_queue
-from neutron.tests import base
+from neutron.tests.unit.services.qos import base
 
 DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
 
 
-class TestQosRpcNotificationDriver(base.BaseTestCase):
+class TestQosRpcNotificationDriver(base.BaseQosTestCase):
 
     def setUp(self):
         super(TestQosRpcNotificationDriver, self).setUp()
-
-        registry_p = mock.patch(
-                            'neutron.api.rpc.callbacks.registry.notify')
-        self.registry_m = registry_p.start()
         self.driver = message_queue.RpcQosServiceNotificationDriver()
 
         self.policy_data = {'policy': {
@@ -52,21 +45,18 @@ class TestQosRpcNotificationDriver(base.BaseTestCase):
                                 context,
                                 **self.rule_data['bandwidth_limit_rule'])
 
-    def _validate_registry_params(self, event_type, policy):
-        self.assertTrue(self.registry_m.called, policy)
-        self.registry_m.assert_called_once_with(
-                resources.QOS_POLICY,
-                event_type,
-                policy)
+    def _validate_push_params(self, event_type, policy):
+        # TODO(QoS): actually validate push works once implemented
+        pass
 
     def test_create_policy(self):
         self.driver.create_policy(self.policy)
-        self.assertFalse(self.registry_m.called)
+        self._validate_push_params(events.CREATED, self.policy)
 
     def test_update_policy(self):
         self.driver.update_policy(self.policy)
-        self._validate_registry_params(events.UPDATED, self.policy)
+        self._validate_push_params(events.UPDATED, self.policy)
 
     def test_delete_policy(self):
         self.driver.delete_policy(self.policy)
-        self._validate_registry_params(events.DELETED, self.policy)
+        self._validate_push_params(events.DELETED, self.policy)
index 92ef36a0039e356e48d92b4b88e9a4816500ddae..1f530512a19b8d4d2d1789c1fd8807f4ba24555e 100644 (file)
@@ -13,8 +13,6 @@
 import mock
 from oslo_config import cfg
 
-from neutron.api.rpc.callbacks import events
-from neutron.api.rpc.callbacks import resources
 from neutron.common import exceptions as n_exc
 from neutron import context
 from neutron import manager
@@ -22,13 +20,13 @@ from neutron.objects import base as base_object
 from neutron.objects.qos import policy as policy_object
 from neutron.objects.qos import rule as rule_object
 from neutron.plugins.common import constants
-from neutron.tests import base
+from neutron.tests.unit.services.qos import base
 
 
 DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
 
 
-class TestQosPlugin(base.BaseTestCase):
+class TestQosPlugin(base.BaseQosTestCase):
 
     def setUp(self):
         super(TestQosPlugin, self).setUp()
@@ -40,15 +38,18 @@ class TestQosPlugin(base.BaseTestCase):
         mock.patch('neutron.db.api.get_object').start()
         mock.patch(
             'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start()
-        self.registry_p = mock.patch(
-            'neutron.api.rpc.callbacks.registry.notify')
-        self.registry_m = self.registry_p.start()
+
         cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
         cfg.CONF.set_override("service_plugins", ["qos"])
 
         mgr = manager.NeutronManager.get_instance()
         self.qos_plugin = mgr.get_service_plugins().get(
             constants.QOS)
+
+        self.notif_driver_p = mock.patch.object(
+            self.qos_plugin, 'notification_driver_manager')
+        self.notif_driver_m = self.notif_driver_p.start()
+
         self.ctxt = context.Context('fake_user', 'fake_tenant')
         self.policy_data = {
             'policy': {'id': 7777777,
@@ -68,50 +69,48 @@ class TestQosPlugin(base.BaseTestCase):
         self.rule = rule_object.QosBandwidthLimitRule(
             context, **self.rule_data['bandwidth_limit_rule'])
 
-    def _validate_registry_params(self, event_type):
-        self.registry_m.assert_called_once_with(
-            resources.QOS_POLICY,
-            event_type,
-            mock.ANY)
+    def _validate_notif_driver_params(self, method_name):
+        method = getattr(self.notif_driver_m, method_name)
+        self.assertTrue(method.called)
         self.assertIsInstance(
-            self.registry_m.call_args[0][2], policy_object.QosPolicy)
+            method.call_args[0][0], policy_object.QosPolicy)
 
     def test_add_policy(self):
         self.qos_plugin.create_policy(self.ctxt, self.policy_data)
-        self.assertFalse(self.registry_m.called)
+        self._validate_notif_driver_params('create_policy')
 
     def test_update_policy(self):
         fields = base_object.get_updatable_fields(
             policy_object.QosPolicy, self.policy_data['policy'])
         self.qos_plugin.update_policy(
             self.ctxt, self.policy.id, {'policy': fields})
-        self._validate_registry_params(events.UPDATED)
+        self._validate_notif_driver_params('update_policy')
 
     @mock.patch('neutron.db.api.get_object', return_value=None)
     def test_delete_policy(self, *mocks):
         self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
-        self._validate_registry_params(events.DELETED)
+        self._validate_notif_driver_params('delete_policy')
 
     def test_create_policy_rule(self):
         with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
                         return_value=self.policy):
             self.qos_plugin.create_policy_bandwidth_limit_rule(
                 self.ctxt, self.policy.id, self.rule_data)
-            self._validate_registry_params(events.UPDATED)
+            self._validate_notif_driver_params('update_policy')
 
     def test_update_policy_rule(self):
         with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
                         return_value=self.policy):
             self.qos_plugin.update_policy_bandwidth_limit_rule(
                 self.ctxt, self.rule.id, self.policy.id, self.rule_data)
-            self._validate_registry_params(events.UPDATED)
+            self._validate_notif_driver_params('update_policy')
 
     def test_delete_policy_rule(self):
         with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
                         return_value=self.policy):
             self.qos_plugin.delete_policy_bandwidth_limit_rule(
                 self.ctxt, self.rule.id, self.policy.id)
-            self._validate_registry_params(events.UPDATED)
+            self._validate_notif_driver_params('update_policy')
 
     def test_get_policy_for_nonexistent_policy(self):
         with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',